mirror of
https://gh.wpcy.net/https://github.com/DecisiveDesignDE/wp-quickstart-installer.git
synced 2026-04-26 11:21:17 +08:00
- Added a new step (Step02_5) for users to select a WordPress setup type (e.g., Elementor, Blog, All Items). - Modified Step03 to dynamically display plugins and themes based on the chosen setup. - Updated APP_CONFIG (defined in step03.py) to store setup configurations, including plugin/theme details (id, name, icon, path). - Changed selections.json to store a list of dictionaries for selected items, providing richer data (including paths) for the installation process. - Updated Step05 to correctly parse the new selections.json structure and use item names for downloading. - Adjusted navigation in SetupApp.py, step02.py, and step03.py to accommodate the new flow.
677 lines
No EOL
33 KiB
Python
677 lines
No EOL
33 KiB
Python
import tkinter as tk
|
|
from tkinter import messagebox
|
|
from tkinter import ttk
|
|
import subprocess
|
|
import threading
|
|
import winsound
|
|
import json
|
|
import sys
|
|
import tempfile
|
|
import os
|
|
import requests
|
|
import zipfile
|
|
import shutil
|
|
from ftplib import FTP
|
|
import paramiko
|
|
import base64
|
|
|
|
PLUGIN_SLUGS = {
|
|
"Elementor": "elementor",
|
|
"Contact Form 7": "contact-form-7",
|
|
"Yoast SEO": "wordpress-seo",
|
|
"WooCommerce": "woocommerce",
|
|
"LiteSpeed Cache": "litespeed-cache",
|
|
"Really Simple SSL": "really-simple-ssl",
|
|
"Yoast Duplicate Post": "duplicate-post",
|
|
"WP Mail SMTP": "wp-mail-smtp",
|
|
"Autoptimize": "autoptimize",
|
|
"Duplicator": "duplicator",
|
|
"WP Fastest Cache": "wp-fastest-cache"
|
|
}
|
|
|
|
THEME_SLUGS = {
|
|
"Hello Elementor": "hello-elementor",
|
|
"Astra": "astra",
|
|
"Kadence": "kadence",
|
|
"GeneratePress": "generatepress",
|
|
"Storefront": "storefront",
|
|
"Hello Biz": "hello-biz"
|
|
}
|
|
|
|
class Step05(tk.Frame):
|
|
def __init__(self, parent, controller, selected_plugins, selected_themes):
|
|
super().__init__(parent)
|
|
self.controller = controller
|
|
self.selected_plugins = selected_plugins
|
|
self.selected_themes = selected_themes
|
|
self.configure(bg="#f4f4f4")
|
|
|
|
self.installation_thread = None # Initialize the installation thread
|
|
self.cancel_event = threading.Event()
|
|
self.temp_folder_path = None # To store path of temp folder for cleanup
|
|
|
|
# Define status item keys
|
|
self.STATUS_DOWNLOADING_WP = "Downloading WordPress..."
|
|
self.STATUS_DOWNLOADING_THEMES = "Downloading Themes..."
|
|
self.STATUS_DOWNLOADING_PLUGINS = "Downloading Plugins..."
|
|
self.STATUS_CREATING_ZIP = "Creating .zip..."
|
|
self.STATUS_UPLOADING_FTP = "Uploading .zip to FTP..."
|
|
self.STATUS_FINISHING_UP = "Finishing up..."
|
|
|
|
# Headline
|
|
title_label = tk.Label(self, text="Installation", font=("Arial", 18, "bold"), bg="#f4f4f4")
|
|
title_label.pack(pady=(20, 10))
|
|
|
|
# Installation Notice
|
|
notice_text = (
|
|
"We will now install WordPress, the selected Plugin(s) and Theme(s) to your server. \n"
|
|
"Again, please make sure to backup your files if the folder is not empty."
|
|
)
|
|
notice_label = tk.Label(self, text=notice_text, wraplength=750, justify="left", bg="#f4f4f4", font=("Arial", 12))
|
|
notice_label.pack(pady=(10, 20), padx=20)
|
|
|
|
# Create the new checklist UI
|
|
self.checklist_frame = tk.Frame(self, bg="#f4f4f4")
|
|
self.checklist_frame.pack(pady=10, padx=20, fill=tk.X, anchor="w") # Fill horizontally, anchor west
|
|
|
|
self.status_vars = {}
|
|
self.status_labels = {}
|
|
|
|
status_items_text = [
|
|
self.STATUS_DOWNLOADING_WP,
|
|
self.STATUS_DOWNLOADING_THEMES,
|
|
self.STATUS_DOWNLOADING_PLUGINS,
|
|
self.STATUS_CREATING_ZIP,
|
|
self.STATUS_UPLOADING_FTP,
|
|
self.STATUS_FINISHING_UP
|
|
]
|
|
|
|
for item_text in status_items_text:
|
|
var = tk.StringVar()
|
|
var.set(f"❌ {item_text}") # Initial state
|
|
self.status_vars[item_text] = var
|
|
|
|
label = tk.Label(self.checklist_frame, textvariable=var, font=("Arial", 12), bg="#f4f4f4", anchor="w", justify="left")
|
|
label.pack(fill=tk.X) # Fill horizontally to use anchor
|
|
self.status_labels[item_text] = label
|
|
|
|
# Start Button
|
|
start_button = ttk.Button(self, text="START", command=self.start_installation, style="Bold.TButton")
|
|
start_button.pack(pady=10)
|
|
self.start_button = start_button # Store reference to the start button
|
|
|
|
# Button frame
|
|
button_frame = tk.Frame(self, bg="#f4f4f4")
|
|
button_frame.pack(pady=20)
|
|
|
|
prev_button = ttk.Button(button_frame, text="< Prev", command=lambda: controller.show_frame("Step04"))
|
|
prev_button.grid(row=0, column=0, padx=10)
|
|
self.prev_button = prev_button # Store reference to the prev button
|
|
|
|
# Cancel Button (new)
|
|
self.cancel_button = ttk.Button(button_frame, text="Cancel", command=self.request_cancel_confirmation, state=tk.DISABLED)
|
|
self.cancel_button.grid(row=0, column=1, padx=10)
|
|
|
|
self.next_button = ttk.Button(button_frame, text="> Next", command=lambda: controller.show_frame("Step06"), state=tk.DISABLED)
|
|
self.next_button.grid(row=0, column=2, padx=10) # Shifted to column 2
|
|
|
|
# Styling
|
|
style = ttk.Style()
|
|
style.configure("Bold.TButton", font=("Arial", 12, "bold"), padding=10)
|
|
|
|
def start_installation(self):
|
|
self.start_button.pack_forget() # Remove the start button
|
|
self.progress_label = tk.Label(self, text="Now installing your WordPress, Themes and Plugins...", font=("Arial", 12), bg="#f4f4f4")
|
|
self.progress_label.pack(pady=10)
|
|
self.progress_bar = ttk.Progressbar(self, orient="horizontal", length=400, mode="determinate")
|
|
self.progress_bar.pack(pady=10)
|
|
self.progress_bar["value"] = 0
|
|
self.progress_bar["maximum"] = 100
|
|
|
|
# self.status_list.config(state=tk.NORMAL)
|
|
# self.status_list.insert(tk.END, "Starting installation...\nPlease wait as this may take a few minutes.\n")
|
|
# self.status_list.config(state=tk.DISABLED)
|
|
self.next_button.config(state=tk.DISABLED)
|
|
self.prev_button.config(state=tk.DISABLED) # Disable the prev button
|
|
self.cancel_button.config(state=tk.NORMAL) # Enable Cancel button
|
|
self.cancel_event.clear() # Reset event for new installation
|
|
self.temp_folder_path = None # Reset temp folder path
|
|
|
|
# Update confirmation message
|
|
self.confirm_label = tk.Label(self, text="This may take several minutes depending on how many Themes and Plugins were chosen. \nPlease be patient and do not close the installation window.", font=("Arial", 12, "bold"), bg="#f4f4f4")
|
|
self.confirm_label.pack(pady=10)
|
|
|
|
def run_installation():
|
|
try:
|
|
temp_folder, _ = self.create_temp_folder()
|
|
self.temp_folder_path = temp_folder # Store for cleanup
|
|
if self.cancel_event.is_set(): return
|
|
|
|
if self.cancel_event.is_set(): return
|
|
wordpress_zip, _ = self.download_wordpress(temp_folder)
|
|
if self.cancel_event.is_set(): return
|
|
wordpress_folder, _ = self.unzip_wordpress(wordpress_zip, temp_folder)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_WP, success=wordpress_folder is not None))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
all_plugins_downloaded = True
|
|
plugin_files = []
|
|
# self.selected_plugins is now a list of dictionaries
|
|
for plugin_data in self.selected_plugins:
|
|
if self.cancel_event.is_set(): return
|
|
# Pass the plugin name (which is used as key in PLUGIN_SLUGS)
|
|
file_path, _ = self.download_plugin(plugin_data["name"], temp_folder)
|
|
if file_path is None:
|
|
all_plugins_downloaded = False
|
|
else:
|
|
plugin_files.append(file_path)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_PLUGINS, success=all_plugins_downloaded))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
all_themes_downloaded = True
|
|
theme_files = []
|
|
# self.selected_themes is now a list of dictionaries
|
|
for theme_data in self.selected_themes:
|
|
if self.cancel_event.is_set(): return
|
|
# Pass the theme name (which is used as key in THEME_SLUGS)
|
|
file_path, _ = self.download_theme(theme_data["name"], temp_folder)
|
|
if file_path is None:
|
|
all_themes_downloaded = False
|
|
else:
|
|
theme_files.append(file_path)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_THEMES, success=all_themes_downloaded))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
successfully_downloaded_plugin_zips = [pf for pf in plugin_files if pf is not None and os.path.exists(pf)]
|
|
successfully_downloaded_theme_zips = [tf for tf in theme_files if tf is not None and os.path.exists(tf)]
|
|
|
|
if self.cancel_event.is_set(): return
|
|
messages = self.install_plugins_and_themes(successfully_downloaded_plugin_zips, successfully_downloaded_theme_zips, wordpress_folder)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_CREATING_ZIP, success=True))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
with open("connection_data.json", "r") as f:
|
|
connection_data = json.load(f)
|
|
if self.cancel_event.is_set(): return
|
|
|
|
if "password" in connection_data:
|
|
encoded_password = connection_data["password"]
|
|
decoded_password_bytes = base64.b64decode(encoded_password.encode('utf-8'))
|
|
connection_data["password"] = decoded_password_bytes.decode('utf-8')
|
|
|
|
server = connection_data.get("server")
|
|
username = connection_data.get("username")
|
|
password = connection_data.get("password")
|
|
port = connection_data.get("port")
|
|
connection_type = connection_data.get("connection_type")
|
|
if self.cancel_event.is_set(): return
|
|
|
|
upload_successful = False
|
|
if connection_type == "FTP":
|
|
upload_successful = self.upload_via_ftp(server, username, password, port, wordpress_folder, self.cancel_event)
|
|
else:
|
|
upload_successful = self.upload_via_sftp(server, username, password, port, wordpress_folder, self.cancel_event)
|
|
|
|
if self.cancel_event.is_set(): return # Check immediately after upload methods which might return due to cancellation
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_UPLOADING_FTP, success=upload_successful))
|
|
if not upload_successful and not self.cancel_event.is_set(): # If upload failed for non-cancel reason
|
|
raise Exception("File upload failed.")
|
|
|
|
|
|
if self.cancel_event.is_set(): return
|
|
try:
|
|
if wordpress_zip and os.path.exists(wordpress_zip):
|
|
os.remove(wordpress_zip)
|
|
if temp_folder and os.path.exists(temp_folder): # temp_folder is self.temp_folder_path
|
|
shutil.rmtree(temp_folder)
|
|
self.temp_folder_path = None # Reset after successful deletion
|
|
except Exception as e:
|
|
print(f"Error during successful cleanup: {e}")
|
|
|
|
if self.cancel_event.is_set(): return
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=True))
|
|
self.next_button.config(state=tk.NORMAL)
|
|
self.prev_button.config(state=tk.NORMAL)
|
|
self.cancel_button.config(state=tk.DISABLED) # Installation complete
|
|
self.show_success_message()
|
|
|
|
if self.cancel_event.is_set(): return # Final check before sensitive file cleanup
|
|
|
|
print("Performing cleanup of sensitive files after successful installation...")
|
|
try:
|
|
if os.path.exists("selections.json"):
|
|
os.remove("selections.json")
|
|
print("Deleted selections.json on success.")
|
|
except Exception as e_sel:
|
|
print(f"Error deleting selections.json on success: {e_sel}")
|
|
|
|
try:
|
|
if os.path.exists("connection_data.json"):
|
|
os.remove("connection_data.json")
|
|
print("Deleted connection_data.json on success.")
|
|
except Exception as e_conn:
|
|
print(f"Error deleting connection_data.json on success: {e_conn}")
|
|
|
|
except Exception as e:
|
|
if not self.cancel_event.is_set(): # Only process as error if not cancelled
|
|
print(f"An error occurred during installation: {e}")
|
|
for key, var in self.status_vars.items():
|
|
if not var.get().startswith("✅"):
|
|
self.controller.after(0, lambda k=key: self.update_checklist_item(k, success=False))
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=False))
|
|
# UI cleanup for error state
|
|
self.prev_button.config(state=tk.NORMAL)
|
|
self.cancel_button.config(state=tk.DISABLED)
|
|
|
|
|
|
self.installation_thread = threading.Thread(target=run_installation)
|
|
self.installation_thread.start()
|
|
|
|
def update_checklist_item(self, item_key, success):
|
|
if item_key in self.status_vars:
|
|
current_text = item_key # Get the base text without the icon
|
|
if success:
|
|
self.status_vars[item_key].set(f"✅ {current_text}")
|
|
else:
|
|
self.status_vars[item_key].set(f"⚠️ {current_text} (Failed)")
|
|
|
|
def stop(self):
|
|
if self.installation_thread and self.installation_thread.is_alive():
|
|
self.installation_thread.join(timeout=1) # Attempt to stop the thread
|
|
|
|
def create_temp_folder(self):
|
|
temp_dir = tempfile.mkdtemp()
|
|
return temp_dir, f"Temporary folder created at: {temp_dir}"
|
|
|
|
def download_wordpress(self, temp_folder):
|
|
url = "https://wordpress.org/latest.zip"
|
|
file_path = os.path.join(temp_folder, "wordpress.zip")
|
|
|
|
response = requests.get(url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
return file_path, f"WordPress downloaded successfully to {file_path}"
|
|
else:
|
|
return None, "Failed to download WordPress."
|
|
|
|
def unzip_wordpress(self, zip_path, temp_folder):
|
|
if zip_path and os.path.exists(zip_path):
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_folder)
|
|
extracted_path = os.path.join(temp_folder, "wordpress")
|
|
if os.path.exists(extracted_path):
|
|
return extracted_path, f"WordPress extracted to {extracted_path}"
|
|
|
|
return None, "WordPress zip file not found or extraction failed."
|
|
|
|
def extract_zip(self, file_path, destination_folder):
|
|
if file_path and os.path.exists(file_path):
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
zip_ref.extractall(destination_folder)
|
|
return f"Extracted {file_path} to {destination_folder}"
|
|
return f"Extraction failed for {file_path}"
|
|
|
|
def get_plugin_download_url(self, plugin_name):
|
|
plugin_slug = PLUGIN_SLUGS.get(plugin_name, plugin_name.lower().replace(" ", "-")) # Use correct slug
|
|
api_url = f"https://api.wordpress.org/plugins/info/1.2/?action=plugin_information&slug={plugin_slug}"
|
|
response = requests.get(api_url)
|
|
|
|
print(f"🔎 Plugin API URL: {api_url}") # Debugging
|
|
print(f"🔎 API Response Status: {response.status_code}")
|
|
print(f"🔎 API Response Text: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
data = response.json()
|
|
if "download_link" in data:
|
|
return data["download_link"]
|
|
else:
|
|
print(f"❌ No download link found in API response for {plugin_name}: {data}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
print(f"❌ JSON decoding error for {plugin_name}. Response: {response.text}")
|
|
return None
|
|
else:
|
|
print(f"❌ API request failed for {plugin_name}. Status code: {response.status_code}")
|
|
return None
|
|
|
|
def get_theme_download_url(self, theme_name):
|
|
theme_slug = THEME_SLUGS.get(theme_name, theme_name.lower().replace(" ", "-")) # Use correct slug
|
|
api_url = f"https://api.wordpress.org/themes/info/1.2/?action=theme_information&slug={theme_slug}"
|
|
response = requests.get(api_url)
|
|
|
|
print(f"🔎 Theme API URL: {api_url}") # Debugging
|
|
print(f"🔎 API Response Status: {response.status_code}")
|
|
print(f"🔎 API Response Text: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
data = response.json()
|
|
if "download_link" in data:
|
|
return data["download_link"]
|
|
else:
|
|
print(f"❌ No download link found in API response for {theme_name}: {data}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
print(f"❌ JSON decoding error for {theme_name}. Response: {response.text}")
|
|
return None
|
|
else:
|
|
print(f"❌ API request failed for {theme_name}. Status code: {response.status_code}")
|
|
return None
|
|
|
|
def download_plugin(self, plugin_slug, temp_folder):
|
|
plugin_url = self.get_plugin_download_url(plugin_slug)
|
|
if not plugin_url:
|
|
return None, f"Failed to retrieve download URL for {plugin_slug}"
|
|
|
|
file_path = os.path.join(temp_folder, f"{plugin_slug}.zip")
|
|
response = requests.get(plugin_url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: # ✅ Ensure file exists before returning success
|
|
return file_path, f"{plugin_slug} downloaded successfully to {file_path}"
|
|
else:
|
|
return None, f"❌ Download failed: {plugin_slug}.zip is empty!"
|
|
else:
|
|
return None, f"❌ Failed to download {plugin_slug}. Status code: {response.status_code}"
|
|
|
|
def download_theme(self, theme_slug, temp_folder):
|
|
theme_url = self.get_theme_download_url(theme_slug)
|
|
if not theme_url:
|
|
return None, f"Failed to retrieve download URL for {theme_slug}"
|
|
|
|
file_path = os.path.join(temp_folder, f"{theme_slug}.zip")
|
|
response = requests.get(theme_url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: # ✅ Ensure file exists before returning success
|
|
return file_path, f"{theme_slug} downloaded successfully to {file_path}"
|
|
else:
|
|
return None, f"❌ Download failed: {theme_slug}.zip is empty!"
|
|
else:
|
|
return None, f"❌ Failed to download {theme_slug}. Status code: {response.status_code}"
|
|
|
|
def install_plugins_and_themes(self, plugin_files, theme_files, wordpress_folder):
|
|
plugins_folder = os.path.join(wordpress_folder, "wp-content", "plugins")
|
|
themes_folder = os.path.join(wordpress_folder, "wp-content", "themes")
|
|
os.makedirs(plugins_folder, exist_ok=True)
|
|
os.makedirs(themes_folder, exist_ok=True)
|
|
|
|
messages = []
|
|
for plugin in plugin_files:
|
|
if plugin:
|
|
messages.append(self.extract_zip(plugin, plugins_folder))
|
|
|
|
for theme in theme_files:
|
|
if theme:
|
|
messages.append(self.extract_zip(theme, themes_folder))
|
|
|
|
messages.append("All plugins and themes installed successfully.")
|
|
return messages
|
|
|
|
def upload_via_ftp(self, server, username, password, port, wordpress_folder, cancel_event):
|
|
ftp = None
|
|
try:
|
|
ftp = FTP()
|
|
# Add cancel check before potentially long connection attempt
|
|
if cancel_event.is_set(): return False
|
|
ftp.connect(server, int(port)) # This can block, consider timeout for ftp.connect if possible
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
ftp.login(username, password)
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
|
|
def ftp_mkdirs(ftp_obj, path):
|
|
dirs = path.split("/")
|
|
current_path = ""
|
|
for dir_item in dirs:
|
|
if cancel_event.is_set(): return False # Check inside loop
|
|
if dir_item:
|
|
current_path += f"/{dir_item}"
|
|
try:
|
|
ftp_obj.cwd(current_path)
|
|
except Exception:
|
|
try:
|
|
ftp_obj.mkd(current_path)
|
|
ftp_obj.cwd(current_path)
|
|
except Exception as e:
|
|
print(f"FTP: Failed to create directory {current_path}: {e}")
|
|
if ftp_obj: ftp_obj.quit()
|
|
return False
|
|
return True
|
|
|
|
total_files = sum([len(files) for _, _, files in os.walk(wordpress_folder)])
|
|
uploaded_files = 0
|
|
|
|
for root, dirs, files in os.walk(wordpress_folder):
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
relative_path = os.path.relpath(root, wordpress_folder).replace("\\", "/")
|
|
if relative_path != ".":
|
|
if not ftp_mkdirs(ftp, relative_path): # This now checks cancel_event
|
|
if cancel_event.is_set(): ftp.quit(); return False # ftp_mkdirs might return False due to cancellation
|
|
print(f"FTP: Skipping folder due to directory creation failure: {relative_path}")
|
|
continue
|
|
|
|
for file_item in files:
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
file_path = os.path.join(root, file_item)
|
|
remote_path = f"{relative_path}/{file_item}".replace("\\", "/")
|
|
if remote_path.startswith("/."): remote_path = remote_path[1:]
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
print(f"FTP Uploading: {file_path} -> {remote_path}")
|
|
with open(file_path, "rb") as f:
|
|
ftp.storbinary(f"STOR {remote_path}", f) # This can block
|
|
if cancel_event.is_set(): ftp.quit(); return False # Check after blocking operation
|
|
uploaded_files += 1
|
|
self.controller.after(0, lambda v=(uploaded_files / total_files) * 100: self.progress_bar.config(value=v))
|
|
except Exception as e:
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
print(f"FTP: Failed to upload {file_path} -> {remote_path}: {e}")
|
|
else:
|
|
print(f"FTP: Skipping missing file: {file_path}")
|
|
ftp.quit()
|
|
return True
|
|
except Exception as e:
|
|
if ftp: ftp.quit()
|
|
if cancel_event.is_set(): return False # Check if exception was due to cancellation signal
|
|
print(f"❌ An error occurred during FTP upload: {e}")
|
|
return False
|
|
|
|
def upload_via_sftp(self, server, username, password, port, wordpress_folder, cancel_event):
|
|
transport = None
|
|
sftp = None
|
|
try:
|
|
if cancel_event.is_set(): return False
|
|
transport = paramiko.Transport((server, int(port)))
|
|
# transport.connect can block, consider how to make it interruptible if too long.
|
|
# For now, check event before and after.
|
|
if cancel_event.is_set(): return False
|
|
transport.connect(username=username, password=password)
|
|
if cancel_event.is_set(): transport.close(); return False
|
|
sftp = paramiko.SFTPClient.from_transport(transport)
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
|
|
def sftp_mkdirs(sftp_obj, path):
|
|
dirs = path.split("/")
|
|
current_path = ""
|
|
for dir_item in dirs:
|
|
if cancel_event.is_set(): return False
|
|
if dir_item:
|
|
current_path += f"/{dir_item}"
|
|
try:
|
|
sftp_obj.stat(current_path)
|
|
except FileNotFoundError:
|
|
try:
|
|
sftp_obj.mkdir(current_path)
|
|
except Exception as e:
|
|
print(f"SFTP: Failed to create directory {current_path}: {e}")
|
|
if sftp_obj: sftp_obj.close()
|
|
if transport: transport.close()
|
|
return False
|
|
return True
|
|
|
|
total_files = sum([len(files) for _, _, files in os.walk(wordpress_folder)])
|
|
uploaded_files = 0
|
|
|
|
for root, dirs, files in os.walk(wordpress_folder):
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
relative_path = os.path.relpath(root, wordpress_folder).replace("\\", "/")
|
|
if relative_path != ".":
|
|
if not sftp_mkdirs(sftp, relative_path): # This now checks cancel_event
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
print(f"SFTP: Skipping folder due to directory creation failure: {relative_path}")
|
|
continue
|
|
|
|
for file_item in files:
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
file_path = os.path.join(root, file_item)
|
|
remote_path = os.path.join(relative_path, file_item).replace("\\", "/")
|
|
if remote_path.startswith("./"): remote_path = remote_path[2:]
|
|
if remote_path.startswith("/"): remote_path = remote_path[1:]
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
print(f"SFTP Uploading: {file_path} -> {remote_path}")
|
|
sftp.put(file_path, remote_path) # This can block
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
uploaded_files += 1
|
|
self.controller.after(0, lambda v=(uploaded_files / total_files) * 100: self.progress_bar.config(value=v))
|
|
except Exception as e:
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
print(f"SFTP: Failed to upload {file_path} -> {remote_path}: {e}")
|
|
else:
|
|
print(f"SFTP: Skipping missing file: {file_path}")
|
|
sftp.close()
|
|
transport.close()
|
|
return True
|
|
except Exception as e:
|
|
if sftp: sftp.close()
|
|
if transport: transport.close()
|
|
if cancel_event.is_set(): return False
|
|
print(f"❌ An error occurred during SFTP upload: {e}")
|
|
return False
|
|
|
|
def show_success_message(self):
|
|
success_window = tk.Toplevel(self)
|
|
success_window.title("Success")
|
|
success_window.geometry("300x150")
|
|
success_window.configure(bg="#f4f4f4")
|
|
|
|
success_label = tk.Label(success_window, text="Congratulations! Installation succeeded.", font=("Arial", 12), bg="#f4f4f4")
|
|
success_label.pack(pady=20)
|
|
|
|
ok_button = ttk.Button(success_window, text="OK", command=success_window.destroy, style="Bold.TButton")
|
|
ok_button.pack(pady=10)
|
|
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS)
|
|
|
|
def update_selections(self, selected_plugins, selected_themes):
|
|
self.selected_plugins = selected_plugins
|
|
self.selected_themes = selected_themes
|
|
# self.populate_install_list()
|
|
|
|
def request_cancel_confirmation(self):
|
|
# Play sound effect (same as success for now, as per user request)
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS | winsound.SND_ASYNC) # SND_ASYNC to avoid blocking
|
|
|
|
response = messagebox.askyesno(
|
|
title="Confirm Cancellation",
|
|
message="This will abort the installation process completely. If you wanna use the tool again, you will have to restart from scratch. Is that okay?",
|
|
parent=self # Ensure dialog is on top of the current window
|
|
)
|
|
|
|
if response: # True if "Yes" is clicked
|
|
self.initiate_cancellation_process()
|
|
else:
|
|
# User clicked "No", do nothing and let installation continue
|
|
print("User chose not to cancel.")
|
|
pass
|
|
|
|
def initiate_cancellation_process(self):
|
|
print("Initiating cancellation process...")
|
|
# This method will be fleshed out in the next plan step to:
|
|
print("Initiating cancellation process...")
|
|
self.cancel_event.set() # Signal the thread to stop
|
|
|
|
if self.installation_thread and self.installation_thread.is_alive():
|
|
print("Waiting for installation thread to acknowledge cancellation...")
|
|
self.installation_thread.join(timeout=10) # Wait for thread up to 10s. Adjust timeout as needed.
|
|
if self.installation_thread.is_alive():
|
|
print("Warning: Installation thread did not terminate gracefully after cancel signal.")
|
|
|
|
# Perform cleanup
|
|
print("Performing cleanup after cancellation...")
|
|
try:
|
|
if os.path.exists("selections.json"):
|
|
os.remove("selections.json")
|
|
print("Deleted selections.json")
|
|
except Exception as e:
|
|
print(f"Error deleting selections.json: {e}")
|
|
|
|
try:
|
|
if os.path.exists("connection_data.json"):
|
|
os.remove("connection_data.json")
|
|
print("Deleted connection_data.json")
|
|
except Exception as e:
|
|
print(f"Error deleting connection_data.json: {e}")
|
|
|
|
try:
|
|
if self.temp_folder_path and os.path.exists(self.temp_folder_path):
|
|
shutil.rmtree(self.temp_folder_path)
|
|
print(f"Deleted temporary folder: {self.temp_folder_path}")
|
|
self.temp_folder_path = None # Reset path
|
|
except Exception as e:
|
|
print(f"Error deleting temporary folder {self.temp_folder_path}: {e}")
|
|
|
|
# Update UI
|
|
if hasattr(self, 'cancel_button'):
|
|
self.cancel_button.config(state=tk.DISABLED)
|
|
if hasattr(self, 'prev_button'):
|
|
self.prev_button.config(state=tk.NORMAL) # Allow going back
|
|
if hasattr(self, 'next_button'):
|
|
self.next_button.config(state=tk.DISABLED) # Cannot proceed
|
|
|
|
# Update checklist: Mark current or 'Finishing up' as cancelled
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=False))
|
|
if self.STATUS_FINISHING_UP in self.status_vars:
|
|
self.controller.after(0, lambda: self.status_vars[self.STATUS_FINISHING_UP].set(f"⚠️ {self.STATUS_FINISHING_UP} (Cancelled)"))
|
|
|
|
# Reset progress bar and labels that appear during installation
|
|
if hasattr(self, 'progress_label') and self.progress_label.winfo_exists():
|
|
self.progress_label.pack_forget()
|
|
if hasattr(self, 'progress_bar') and self.progress_bar.winfo_exists():
|
|
self.progress_bar.pack_forget()
|
|
if hasattr(self, 'confirm_label') and hasattr(self.confirm_label, 'winfo_exists') and self.confirm_label.winfo_exists():
|
|
self.confirm_label.pack_forget()
|
|
|
|
print("Cancellation process complete.")
|
|
|
|
def request_cancel_confirmation_from_x(self):
|
|
# This method is called when the main window's 'X' is clicked during installation.
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS | winsound.SND_ASYNC)
|
|
|
|
response = messagebox.askyesno(
|
|
title="Confirm Cancellation",
|
|
message="This will abort the installation process completely. If you wanna use the tool again, you will have to restart from scratch. Is that okay?",
|
|
parent=self
|
|
)
|
|
|
|
if response: # True if "Yes" is clicked
|
|
self.initiate_cancellation_process() # This handles event setting, thread join, cleanup
|
|
# After cancellation process is done (including thread join), destroy the main app window.
|
|
self.controller.destroy()
|
|
return True # Indicates that "Yes, Cancel" was chosen and processed
|
|
else:
|
|
# User clicked "No", do nothing to the installation, window should not close.
|
|
print("User chose not to cancel via X button.")
|
|
return False # Indicates that "No, Continue" was chosen |