import sys import webbrowser import threading import subprocess import os import zipfile import tempfile import shutil import json import re import math from pathlib import Path from datetime import datetime from flask import Flask, render_template, request, send_file, jsonify from flask_socketio import SocketIO, emit import time from PyPDF2 import PdfMerger, PdfReader, PdfWriter from reportlab.pdfgen import canvas from reportlab.lib.pagesizes import letter from variant_manager import VariantManager import pyodbc from io import BytesIO app = Flask(__name__) app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, cors_allowed_origins="*", ping_timeout=120, # 2 minutes timeout ping_interval=25) # Send ping every 25 seconds # Store arguments app_args = {} connected_clients = set() heartbeat_timeout = 5 # seconds shutdown_timer = None # Track shutdown timer to cancel if client reconnects # Configuration config_file = 'config.json' app_config = {} def load_config(): """Load configuration from file""" global app_config if os.path.exists(config_file): with open(config_file, 'r') as f: app_config = json.load(f) else: app_config = { 'parts_spreadsheet_path': '' } return app_config def save_config(): """Save configuration to file""" with open(config_file, 'w') as f: json.dump(app_config, f, indent=2) def add_text_overlay_to_pdf(input_pdf_path, output_pdf_path, text, x=50, y=750, font_size=14): """Add text overlay to upper left corner of PDF""" # Read the existing PDF reader = PdfReader(input_pdf_path) writer = PdfWriter() # Create text overlay packet = BytesIO() can = canvas.Canvas(packet, pagesize=letter) can.setFont("Helvetica-Bold", font_size) can.drawString(x, y, text) can.save() # Move to the beginning of the BytesIO buffer packet.seek(0) overlay_pdf = PdfReader(packet) # Merge overlay with each page for page in reader.pages: page.merge_page(overlay_pdf.pages[0]) writer.add_page(page) # Write output with open(output_pdf_path, 'wb') as output_file: writer.write(output_file) # --------------------------------------------------------------------------- # KiCad Library Functions # --------------------------------------------------------------------------- def get_kicad_lib_path(): """Get the KiCad library path from UM_KICAD environment variable.""" um_kicad = os.environ.get('UM_KICAD') if not um_kicad: return None return os.path.join(um_kicad, 'lib') def get_symbol_libraries(): """Get list of all symbol library files.""" lib_path = get_kicad_lib_path() if not lib_path: return [] symbols_dir = os.path.join(lib_path, 'symbols') if not os.path.exists(symbols_dir): return [] libraries = [] for filename in os.listdir(symbols_dir): if filename.endswith('.kicad_sym'): # Remove .kicad_sym extension for display display_name = filename[:-10] if filename.endswith('.kicad_sym') else filename libraries.append({ 'name': display_name, 'path': os.path.join(symbols_dir, filename) }) return sorted(libraries, key=lambda x: x['name']) def get_footprint_libraries(): """Get list of all footprint libraries (.pretty directories).""" lib_path = get_kicad_lib_path() if not lib_path: return [] footprints_dir = os.path.join(lib_path, 'footprints') if not os.path.exists(footprints_dir): return [] libraries = [] for item in os.listdir(footprints_dir): item_path = os.path.join(footprints_dir, item) if os.path.isdir(item_path) and item.endswith('.pretty'): # Remove .pretty extension for display display_name = item[:-7] if item.endswith('.pretty') else item libraries.append({ 'name': display_name, 'path': item_path }) return sorted(libraries, key=lambda x: x['name']) def parse_kicad_symbol_file(file_path): """Parse a KiCad symbol library file and extract symbol names.""" symbols = [] try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Find all symbols (top-level only, not sub-symbols) symbol_pattern = r'^\t\(symbol\s+"([^"]+)"' symbol_names = re.findall(symbol_pattern, content, re.MULTILINE) # Filter out sub-symbols (those with _X_Y suffix) for symbol_name in symbol_names: if not re.match(r'.*_\d+_\d+$', symbol_name): symbols.append(symbol_name) except Exception as e: print(f"Error parsing {file_path}: {e}") return sorted(symbols) def get_footprints_in_library(library_path): """Get list of all footprints in a .pretty library.""" footprints = [] try: for filename in os.listdir(library_path): if filename.endswith('.kicad_mod'): footprint_name = filename[:-10] # Remove .kicad_mod extension footprints.append(footprint_name) except Exception as e: print(f"Error reading footprints from {library_path}: {e}") return sorted(footprints) def extract_symbol_graphics(file_path, symbol_name): """Extract graphical elements from a symbol for rendering.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Find the symbol definition symbol_pattern = re.compile(r'^\t\(symbol\s+"' + re.escape(symbol_name) + r'"\s*$', re.MULTILINE) match = symbol_pattern.search(content) if not match: return None symbol_start = match.start() # Find the end of this symbol next_symbol_pattern = re.compile(r'^\t\(symbol\s+"[^"]+"', re.MULTILINE) next_match = next_symbol_pattern.search(content, symbol_start + len(symbol_name) + 10) if next_match: symbol_block = content[symbol_start:next_match.start()] else: end_pattern = re.compile(r'^\)', re.MULTILINE) end_match = end_pattern.search(content, symbol_start + 1) if end_match: symbol_block = content[symbol_start:end_match.start()] else: symbol_block = content[symbol_start:] # Filter out alternate De Morgan representations symbol_block = re.sub( r'^\t\t\(symbol\s+"' + re.escape(symbol_name) + r'_(\d+)_([2-9])".*?^\t\t\)', '', symbol_block, flags=re.MULTILINE | re.DOTALL ) return symbol_block except Exception as e: print(f"Error extracting graphics for {symbol_name}: {e}") return None def render_symbol_to_svg(symbol_block): """Convert symbol graphics to SVG.""" if not symbol_block: return None svg_elements = [] min_x, min_y, max_x, max_y = 0, 0, 0, 0 # Parse polylines for polyline in re.finditer(r'\(polyline\n(.*?)\n\t+\)', symbol_block, re.DOTALL): polyline_text = polyline.group(0) points = [] # Extract points pts_match = re.search(r'\(pts\s+(.*?)\n\s*\)', polyline_text, re.DOTALL) if pts_match: pts_content = pts_match.group(1) xy_pattern = r'\(xy\s+([-\d.]+)\s+([-\d.]+)\)' for match in re.finditer(xy_pattern, pts_content): x, y = float(match.group(1)), -float(match.group(2)) points.append((x, y)) if points: stroke_width = 0.254 stroke_match = re.search(r'\(width\s+([-\d.]+)\)', polyline_text) if stroke_match: stroke_width = float(stroke_match.group(1)) fill_type = 'none' fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', polyline_text) if fill_match: fill_type = fill_match.group(1) path_d = f"M {points[0][0]} {points[0][1]}" for x, y in points[1:]: path_d += f" L {x} {y}" fill_color = 'none' if fill_type == 'none' else '#FFFFCC' if fill_type == 'outline': path_d += ' Z' fill_color = '#FFFFCC' svg_elements.append(f'') for x, y in points: min_x, max_x = min(min_x, x), max(max_x, x) min_y, max_y = min(min_y, y), max(max_y, y) # Parse circles for circle in re.finditer(r'\(circle\n(.*?)\n\t+\)', symbol_block, re.DOTALL): circle_text = circle.group(0) center_match = re.search(r'\(center\s+([-\d.]+)\s+([-\d.]+)\)', circle_text) radius_match = re.search(r'\(radius\s+([-\d.]+)\)', circle_text) if center_match and radius_match: cx, cy = float(center_match.group(1)), -float(center_match.group(2)) radius = float(radius_match.group(1)) stroke_width = 0.254 stroke_match = re.search(r'\(width\s+([-\d.]+)\)', circle_text) if stroke_match: stroke_width = float(stroke_match.group(1)) fill_type = 'none' fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', circle_text) if fill_match: fill_type = fill_match.group(1) fill_color = 'none' if fill_type == 'none' else '#FFFFCC' svg_elements.append(f'') min_x, max_x = min(min_x, cx - radius), max(max_x, cx + radius) min_y, max_y = min(min_y, cy - radius), max(max_y, cy + radius) # Parse rectangles for rect in re.finditer(r'(\t+)\(rectangle\n.*?\n\1\)', symbol_block, re.DOTALL): rect_text = rect.group(0) start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', rect_text) end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', rect_text) if start_match and end_match: x1, y1 = float(start_match.group(1)), -float(start_match.group(2)) x2, y2 = float(end_match.group(1)), -float(end_match.group(2)) width = abs(x2 - x1) height = abs(y2 - y1) x = min(x1, x2) y = min(y1, y2) stroke_width = 0.254 stroke_match = re.search(r'\(width\s+([-\d.]+)\)', rect_text) if stroke_match: stroke_width = max(float(stroke_match.group(1)), 0.1) fill_type = 'none' fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', rect_text) if fill_match: fill_type = fill_match.group(1) fill_color = 'none' if fill_type == 'none' else '#FFFFCC' svg_elements.append(f'') min_x, max_x = min(min_x, x), max(max_x, x + width) min_y, max_y = min(min_y, y), max(max_y, y + height) # Parse text elements for text in re.finditer(r'\(text\s+"([^"]*)".*?\n\t+\)', symbol_block, re.DOTALL): text_str = text.group(1) text_block = text.group(0) at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)', text_block) if at_match and text_str: x, y = float(at_match.group(1)), -float(at_match.group(2)) size = 1.27 size_match = re.search(r'\(size\s+([-\d.]+)', text_block) if size_match: size = float(size_match.group(1)) svg_elements.append(f'{text_str}') # Parse pins for pin in re.finditer(r'(\t+)\(pin\s+\w+\s+\w+\n.*?\n\1\)', symbol_block, re.DOTALL): pin_text = pin.group(0) at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\)', pin_text) length_match = re.search(r'\(length\s+([-\d.]+)\)', pin_text) number_match = re.search(r'\(number\s+"([^"]*)"', pin_text) name_match = re.search(r'\(name\s+"([^"]*)"', pin_text) if at_match and length_match: x, y = float(at_match.group(1)), -float(at_match.group(2)) angle = float(at_match.group(3)) length = float(length_match.group(1)) number = number_match.group(1) if number_match else "" name = name_match.group(1) if name_match else "" # Filter out placeholder names if name == "~": name = "" angle_rad = math.radians(angle) x2 = x + length * math.cos(angle_rad) y2 = y - length * math.sin(angle_rad) svg_elements.append(f'') svg_elements.append(f'') # Pin number at the connection point (outside) if number: # Position number outside the pin with more offset num_offset = 1.5 num_x = x - math.cos(angle_rad) * num_offset num_y = y + math.sin(angle_rad) * num_offset # Determine text anchor based on pin direction anchor = "middle" if angle == 0: # Right - number on left side anchor = "end" num_x = x - num_offset elif angle == 180: # Left - number on right side anchor = "start" num_x = x + num_offset elif angle == 90: # Down - number on top anchor = "middle" num_y = y - num_offset elif angle == 270: # Up - number on bottom anchor = "middle" num_y = y + num_offset svg_elements.append(f'{number}') # Pin name at the inner end (inside symbol) if name: # Position name inside the symbol with more offset from pin end name_offset = 1.2 name_x = x2 - math.cos(angle_rad) * name_offset name_y = y2 + math.sin(angle_rad) * name_offset # Determine text anchor based on pin direction anchor = "middle" if angle == 0: # Right - name inside on right anchor = "start" name_x = x2 + name_offset elif angle == 180: # Left - name inside on left anchor = "end" name_x = x2 - name_offset elif angle == 90: # Down - name inside below anchor = "middle" name_y = y2 + name_offset elif angle == 270: # Up - name inside above anchor = "middle" name_y = y2 - name_offset svg_elements.append(f'{name}') min_x = min(min_x, x, x2) max_x = max(max_x, x, x2) min_y = min(min_y, y, y2) max_y = max(max_y, y, y2) if not svg_elements: return 'No preview' # Add padding padding = 5 min_x -= padding min_y -= padding max_x += padding max_y += padding width = max_x - min_x height = max_y - min_y scale = 10 svg = f''' {''.join(svg_elements)} ''' return svg def render_footprint_to_svg(file_path): """Render a KiCad footprint to SVG.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() svg_elements = [] min_x, min_y, max_x, max_y = 0, 0, 0, 0 # Parse fp_line elements (silkscreen/fab lines) for line in re.finditer(r'\(fp_line\s+(.*?)\n\t\)', content, re.DOTALL): line_text = line.group(1) start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', line_text) end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', line_text) width_match = re.search(r'\(width\s+([-\d.]+)\)', line_text) layer_match = re.search(r'\(layer\s+"([^"]+)"\)', line_text) if start_match and end_match: x1, y1 = float(start_match.group(1)), float(start_match.group(2)) x2, y2 = float(end_match.group(1)), float(end_match.group(2)) width = float(width_match.group(1)) if width_match else 0.15 layer = layer_match.group(1) if layer_match else "F.SilkS" # Color by layer color = "#CC00CC" if "SilkS" in layer else "#888888" svg_elements.append(f'') min_x = min(min_x, x1, x2) max_x = max(max_x, x1, x2) min_y = min(min_y, y1, y2) max_y = max(max_y, y1, y2) # Parse fp_circle elements for circle in re.finditer(r'\(fp_circle\s+(.*?)\n\t\)', content, re.DOTALL): circle_text = circle.group(1) center_match = re.search(r'\(center\s+([-\d.]+)\s+([-\d.]+)\)', circle_text) end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', circle_text) width_match = re.search(r'\(width\s+([-\d.]+)\)', circle_text) layer_match = re.search(r'\(layer\s+"([^"]+)"\)', circle_text) if center_match and end_match: cx, cy = float(center_match.group(1)), float(center_match.group(2)) ex, ey = float(end_match.group(1)), float(end_match.group(2)) radius = ((ex - cx)**2 + (ey - cy)**2)**0.5 width = float(width_match.group(1)) if width_match else 0.15 layer = layer_match.group(1) if layer_match else "F.SilkS" color = "#CC00CC" if "SilkS" in layer else "#888888" svg_elements.append(f'') min_x = min(min_x, cx - radius) max_x = max(max_x, cx + radius) min_y = min(min_y, cy - radius) max_y = max(max_y, cy + radius) # Parse fp_rect elements for rect in re.finditer(r'\(fp_rect\s+(.*?)\n\t\)', content, re.DOTALL): rect_text = rect.group(1) start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', rect_text) end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', rect_text) width_match = re.search(r'\(width\s+([-\d.]+)\)', rect_text) layer_match = re.search(r'\(layer\s+"([^"]+)"\)', rect_text) if start_match and end_match: x1, y1 = float(start_match.group(1)), float(start_match.group(2)) x2, y2 = float(end_match.group(1)), float(end_match.group(2)) width = float(width_match.group(1)) if width_match else 0.15 layer = layer_match.group(1) if layer_match else "F.SilkS" color = "#CC00CC" if "SilkS" in layer else "#888888" w = abs(x2 - x1) h = abs(y2 - y1) x = min(x1, x2) y = min(y1, y2) svg_elements.append(f'') min_x = min(min_x, x) max_x = max(max_x, x + w) min_y = min(min_y, y) max_y = max(max_y, y + h) # Parse pads for pad in re.finditer(r'\(pad\s+"([^"]+)"\s+(\w+)\s+(\w+)\s+(.*?)\n\t\)', content, re.DOTALL): pad_num = pad.group(1) pad_type = pad.group(2) # smd, thru_hole, np_thru_hole pad_shape = pad.group(3) # rect, circle, oval, roundrect pad_params = pad.group(4) at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)(?:\s+([-\d.]+))?\)', pad_params) size_match = re.search(r'\(size\s+([-\d.]+)\s+([-\d.]+)\)', pad_params) if at_match and size_match: px, py = float(at_match.group(1)), float(at_match.group(2)) rotation = float(at_match.group(3)) if at_match.group(3) else 0 width, height = float(size_match.group(1)), float(size_match.group(2)) # Color based on pad type pad_color = "#C87533" if pad_type == "smd" else "#FFD700" # Create transform for rotation (only add if rotation is non-zero) transform_attr = f' transform="rotate({rotation} {px} {py})"' if rotation != 0 else '' if pad_shape == "rect": x = px - width / 2 y = py - height / 2 svg_elements.append(f'') elif pad_shape == "circle": r = width / 2 svg_elements.append(f'') elif pad_shape == "oval": rx = width / 2 ry = height / 2 svg_elements.append(f'') elif pad_shape == "roundrect": # Extract roundrect_rratio if available rratio_match = re.search(r'\(roundrect_rratio\s+([-\d.]+)\)', pad_params) rratio = float(rratio_match.group(1)) if rratio_match else 0.25 # Calculate corner radius based on the smaller dimension corner_radius = min(width, height) * rratio x = px - width / 2 y = py - height / 2 svg_elements.append(f'') # Add pad number svg_elements.append(f'{pad_num}') min_x = min(min_x, px - width / 2) max_x = max(max_x, px + width / 2) min_y = min(min_y, py - height / 2) max_y = max(max_y, py + height / 2) if not svg_elements: return 'No preview' # Add padding (as percentage of footprint size for better scaling) width = max_x - min_x height = max_y - min_y padding = max(width, height) * 0.15 # 15% padding min_x -= padding min_y -= padding max_x += padding max_y += padding width = max_x - min_x height = max_y - min_y # Don't set fixed width/height - let it scale to fill container while maintaining aspect ratio svg = f''' {''.join(svg_elements)} ''' return svg except Exception as e: print(f"Error rendering footprint: {e}") return None @app.route('/') def index(): # Reconstruct the command line that invoked this app cmd_parts = [sys.argv[0]] for i in range(1, len(sys.argv)): arg = sys.argv[i] # Quote arguments with spaces if ' ' in arg: cmd_parts.append(f'"{arg}"') else: cmd_parts.append(arg) invocation_cmd = ' '.join(cmd_parts) # Check if called with no arguments (library-only mode) library_only_mode = len(sys.argv) == 1 or not app_args return render_template('index.html', args=app_args, invocation_cmd=invocation_cmd, library_only_mode=library_only_mode) @socketio.on('connect') def handle_connect(): global shutdown_timer # Cancel shutdown timer if client reconnects if shutdown_timer is not None: print("Client reconnected, canceling shutdown") shutdown_timer.cancel() shutdown_timer = None connected_clients.add(request.sid) print(f"Client connected: {request.sid}") @socketio.on('disconnect') def handle_disconnect(): global shutdown_timer connected_clients.discard(request.sid) print(f"Client disconnected: {request.sid}") # Shutdown if no clients connected after waiting for potential reconnect if not connected_clients: print("No clients connected. Waiting 2 seconds for reconnect before shutting down...") shutdown_timer = threading.Timer(2.0, shutdown_server) shutdown_timer.start() @socketio.on('heartbeat') def handle_heartbeat(): emit('heartbeat_ack') @socketio.on('generate_pdf') def handle_generate_pdf(): try: kicad_cli = app_args.get('Kicad Cli', '') schematic_file = app_args.get('Schematic File', '') board_file = app_args.get('Board File', '') project_dir = app_args.get('Project Dir', '') project_name = app_args.get('Project Name', 'project') if not kicad_cli: emit('pdf_error', {'error': 'Missing kicad-cli argument'}) return # Create temporary directory for PDFs temp_dir = tempfile.mkdtemp() schematics_dir = os.path.join(temp_dir, 'schematics') board_dir = os.path.join(temp_dir, 'board') os.makedirs(schematics_dir, exist_ok=True) os.makedirs(board_dir, exist_ok=True) # Generate schematic PDF if schematic_file: emit('pdf_status', {'status': 'Generating schematic PDF...'}) sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf') cmd = [kicad_cli, 'sch', 'export', 'pdf', schematic_file, '-o', sch_pdf_path] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: shutil.rmtree(temp_dir) emit('pdf_error', {'error': f'Schematic PDF failed: {result.stderr}'}) return # Generate board layer PDFs - one per layer, then merge if board_file: emit('pdf_status', {'status': 'Generating board layer PDFs...'}) # All layers to export layers = [ ('F.Cu', 'Top_Copper'), ('B.Cu', 'Bottom_Copper'), ('F.Silkscreen', 'Top_Silkscreen'), ('B.Silkscreen', 'Bottom_Silkscreen'), ('F.Mask', 'Top_Soldermask'), ('B.Mask', 'Bottom_Soldermask'), ('F.Paste', 'Top_Paste'), ('B.Paste', 'Bottom_Paste'), ('Edge.Cuts', 'Board_Outline'), ('F.Fab', 'Top_Fabrication'), ('B.Fab', 'Bottom_Fabrication'), ] temp_pdf_dir = os.path.join(temp_dir, 'temp_pdfs') os.makedirs(temp_pdf_dir, exist_ok=True) pdf_files = [] for layer_name, file_suffix in layers: pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf') # Include Edge.Cuts on every layer except the Edge.Cuts layer itself if layer_name == 'Edge.Cuts': layers_to_export = layer_name else: layers_to_export = f"{layer_name},Edge.Cuts" cmd = [ kicad_cli, 'pcb', 'export', 'pdf', board_file, '-l', layers_to_export, '--include-border-title', '-o', pdf_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: pdf_files.append(pdf_path) else: print(f"Warning: Failed to generate {layer_name}: {result.stderr}") # Merge all PDFs into one if pdf_files: emit('pdf_status', {'status': 'Merging board layer PDFs...'}) merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf') merger = PdfMerger() for pdf in pdf_files: merger.append(pdf) merger.write(merged_pdf_path) merger.close() # Delete temp PDF directory shutil.rmtree(temp_pdf_dir) # Create ZIP file emit('pdf_status', {'status': 'Creating ZIP archive...'}) zip_filename = f'{project_name}_PDFs.zip' zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(temp_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, temp_dir) zipf.write(file_path, arcname) # Clean up temp directory shutil.rmtree(temp_dir) emit('pdf_complete', {'path': zip_path, 'filename': zip_filename}) except Exception as e: emit('pdf_error', {'error': str(e)}) @socketio.on('generate_gerbers') def handle_generate_gerbers(): try: kicad_cli = app_args.get('Kicad Cli', '') board_file = app_args.get('Board File', '') project_dir = app_args.get('Project Dir', '') project_name = app_args.get('Project Name', 'project') if not kicad_cli or not board_file: emit('gerber_error', {'error': 'Missing kicad-cli or board-file arguments'}) return # Create temporary directory for gerbers temp_dir = tempfile.mkdtemp() gerber_dir = os.path.join(temp_dir, 'gerbers') os.makedirs(gerber_dir, exist_ok=True) # Generate gerbers emit('gerber_status', {'status': 'Generating gerber files...'}) cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: shutil.rmtree(temp_dir) emit('gerber_error', {'error': f'Gerber generation failed: {result.stderr}'}) return # Generate drill files emit('gerber_status', {'status': 'Generating drill files...'}) cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Warning: Drill file generation failed: {result.stderr}") # Generate ODB++ files emit('gerber_status', {'status': 'Generating ODB++ files...'}) odb_dir = os.path.join(temp_dir, 'odb') os.makedirs(odb_dir, exist_ok=True) odb_file = os.path.join(odb_dir, f'{project_name}.zip') cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"Warning: ODB++ generation failed: {result.stderr}") # Create ZIP file emit('gerber_status', {'status': 'Creating ZIP archive...'}) zip_filename = f'{project_name}_fab.zip' zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Add gerbers folder for root, dirs, files in os.walk(gerber_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.join('gerbers', os.path.basename(file_path)) zipf.write(file_path, arcname) # Add odb folder for root, dirs, files in os.walk(odb_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.join('odb', os.path.relpath(file_path, odb_dir)) zipf.write(file_path, arcname) # Clean up temp directory shutil.rmtree(temp_dir) emit('gerber_complete', {'path': zip_path, 'filename': zip_filename}) except Exception as e: emit('gerber_error', {'error': str(e)}) @socketio.on('export_step') def handle_export_step(): try: kicad_cli = app_args.get('Kicad Cli', '') board_file = app_args.get('Board File', '') project_dir = app_args.get('Project Dir', '') project_name = app_args.get('Project Name', 'project') if not kicad_cli or not board_file: emit('step_error', {'error': 'Missing kicad-cli or board-file arguments'}) return # Create output filename step_filename = f'{project_name}.step' step_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), step_filename) # Generate STEP file emit('step_status', {'status': 'Exporting PCB to STEP format...'}) cmd = [kicad_cli, 'pcb', 'export', 'step', board_file, '-o', step_path] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('step_error', {'error': f'STEP export failed: {result.stderr}'}) return if not os.path.exists(step_path): emit('step_error', {'error': 'STEP file was not created'}) return emit('step_complete', {'path': step_path, 'filename': step_filename}) except Exception as e: emit('step_error', {'error': str(e)}) @socketio.on('render_pcb') def handle_render_pcb(): try: kicad_cli = app_args.get('Kicad Cli', '') board_file = app_args.get('Board File', '') project_dir = app_args.get('Project Dir', '') project_name = app_args.get('Project Name', 'project') if not kicad_cli or not board_file: emit('render_error', {'error': 'Missing kicad-cli or board-file arguments'}) return # Create output filename render_filename = f'{project_name}_iso_view.png' render_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), render_filename) # Render PCB with isometric view emit('render_status', {'status': 'Rendering PCB image...'}) cmd = [ kicad_cli, 'pcb', 'render', board_file, '--rotate', '25,0,45', '-o', render_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: error_msg = f'PCB render failed:\nCommand: {" ".join(cmd)}\nStderr: {result.stderr}\nStdout: {result.stdout}' emit('render_error', {'error': error_msg}) return if not os.path.exists(render_path): emit('render_error', {'error': 'Rendered image was not created'}) return emit('render_complete', {'path': render_path, 'filename': render_filename}) except Exception as e: emit('render_error', {'error': str(e)}) @socketio.on('sync_libraries') def handle_sync_libraries(): try: emit('sync_status', {'status': 'Starting library synchronization...'}) # Check if UM_KICAD is set um_kicad = os.environ.get('UM_KICAD') if not um_kicad: emit('sync_error', {'error': 'UM_KICAD environment variable is not set in the Flask app environment'}) return emit('sync_status', {'status': f'UM_KICAD is set to: {um_kicad}'}) # Run the add_libraries.py script script_path = os.path.join(os.path.dirname(__file__), 'add_libraries.py') if not os.path.exists(script_path): emit('sync_error', {'error': 'add_libraries.py script not found'}) return result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, env=os.environ.copy() ) output = result.stdout + result.stderr if result.returncode == 0: emit('sync_complete', {'output': output}) else: emit('sync_error', {'error': f'Sync failed:\n{output}'}) except Exception as e: emit('sync_error', {'error': str(e)}) @socketio.on('sync_database') def handle_sync_database(): try: emit('db_sync_status', {'status': 'Starting database synchronization...'}) # Get the parts spreadsheet path from config parts_spreadsheet = app_config.get('parts_spreadsheet_path', '') if not parts_spreadsheet: emit('db_sync_error', {'error': 'Parts spreadsheet path not configured. Please set it in Settings.'}) return if not os.path.exists(parts_spreadsheet): emit('db_sync_error', {'error': f'Parts spreadsheet not found at: {parts_spreadsheet}'}) return emit('db_sync_status', {'status': f'Using parts spreadsheet: {parts_spreadsheet}'}) # Run the gen_resistors_db.py script script_path = os.path.join(os.path.dirname(__file__), 'gen_resistors_db.py') if not os.path.exists(script_path): emit('db_sync_error', {'error': 'gen_resistors_db.py script not found'}) return result = subprocess.run( [sys.executable, script_path, parts_spreadsheet], capture_output=True, text=True, env=os.environ.copy() ) output = result.stdout + result.stderr if result.returncode == 0: emit('db_sync_complete', {'output': output}) else: emit('db_sync_error', {'error': f'Database sync failed:\n{output}'}) except Exception as e: emit('db_sync_error', {'error': str(e)}) @socketio.on('init_user') def handle_init_user(): try: emit('init_status', {'status': 'Starting user environment initialization...'}) # Check if UM_KICAD is set um_kicad = os.environ.get('UM_KICAD') if not um_kicad: emit('init_error', {'error': 'UM_KICAD environment variable is not set'}) return emit('init_status', {'status': f'UM_KICAD: {um_kicad}'}) # Run the init_user.py script script_path = os.path.join(os.path.dirname(__file__), 'init_user.py') if not os.path.exists(script_path): emit('init_error', {'error': 'init_user.py script not found'}) return result = subprocess.run( [sys.executable, script_path], capture_output=True, text=True, env=os.environ.copy() ) output = result.stdout + result.stderr if result.returncode == 0: emit('init_complete', {'output': output}) else: emit('init_error', {'error': f'Initialization failed:\n{output}'}) except Exception as e: emit('init_error', {'error': str(e)}) @app.route('/download/') def download_file(filename): project_dir = app_args.get('Project Dir', '') file_path = os.path.join(project_dir, filename) if os.path.exists(file_path): return send_file(file_path, as_attachment=True) return "File not found", 404 @app.route('/config', methods=['GET', 'POST']) def config(): if request.method == 'POST': data = request.get_json() app_config['parts_spreadsheet_path'] = data.get('parts_spreadsheet_path', '') save_config() return jsonify({'status': 'success', 'config': app_config}) else: return jsonify(app_config) @app.route('/variants') def variants_page(): return render_template('variants.html') # --------------------------------------------------------------------------- # Variant Management Socket Handlers # --------------------------------------------------------------------------- def get_variant_manager(): """Get VariantManager instance for current project""" schematic_file = app_args.get('Schematic File', '') if not schematic_file or not os.path.exists(schematic_file): return None return VariantManager(schematic_file) def get_all_schematic_files(root_schematic): """Get all schematic files in a hierarchical design""" from pathlib import Path root_path = Path(root_schematic) if not root_path.exists(): return [root_schematic] schematic_files = [str(root_path)] schematic_dir = root_path.parent try: with open(root_path, 'r', encoding='utf-8') as f: content = f.read() for line in content.split('\n'): if '(property "Sheetfile"' in line: parts = line.split('"') if len(parts) >= 4: sheet_file = parts[3] sheet_path = schematic_dir / sheet_file if sheet_path.exists(): sub_sheets = get_all_schematic_files(str(sheet_path)) for sub in sub_sheets: if sub not in schematic_files: schematic_files.append(sub) except: pass return schematic_files def get_all_parts_from_schematic(): """Get all component references, values, and UUIDs from all schematics (including hierarchical sheets)""" schematic_file = app_args.get('Schematic File', '') if not schematic_file or not os.path.exists(schematic_file): return [] # Get all schematic files all_schematics = get_all_schematic_files(schematic_file) all_parts = {} # uuid -> {reference, value} for sch_file in all_schematics: try: with open(sch_file, 'r', encoding='utf-8') as f: content = f.read() lines = content.split('\n') in_symbol = False current_uuid = None current_ref = None current_value = None current_lib_id = None for line in lines: stripped = line.strip() # Detect start of symbol if stripped.startswith('(symbol'): in_symbol = True current_uuid = None current_ref = None current_value = None current_lib_id = None # Detect end of symbol elif in_symbol and stripped == ')': # Save the part if we have all the info, excluding power symbols is_power = current_lib_id and 'power:' in current_lib_id is_power = is_power or (current_ref and current_ref.startswith('#')) if current_uuid and current_ref and not is_power and len(current_ref) > 1: all_parts[current_uuid] = { 'reference': current_ref, 'value': current_value or '' } in_symbol = False # Extract lib_id to check for power symbols elif in_symbol and '(lib_id' in stripped: lib_parts = line.split('"') if len(lib_parts) >= 2: current_lib_id = lib_parts[1] # Extract UUID elif in_symbol and '(uuid' in stripped: uuid_parts = line.split('"') if len(uuid_parts) >= 2: current_uuid = uuid_parts[1] # Extract reference - format: (property "Reference" "U1" ... elif in_symbol and '(property "Reference"' in line: try: start = line.find('"Reference"') + len('"Reference"') remainder = line[start:] quote_start = remainder.find('"') if quote_start != -1: quote_end = remainder.find('"', quote_start + 1) if quote_end != -1: current_ref = remainder[quote_start + 1:quote_end] except: pass # Extract value - format: (property "Value" "LM358" ... elif in_symbol and '(property "Value"' in line: try: start = line.find('"Value"') + len('"Value"') remainder = line[start:] quote_start = remainder.find('"') if quote_start != -1: quote_end = remainder.find('"', quote_start + 1) if quote_end != -1: current_value = remainder[quote_start + 1:quote_end] except: pass except Exception as e: print(f"Error reading schematic {sch_file}: {e}") return [{'uuid': uuid, 'reference': data['reference'], 'value': data['value']} for uuid, data in sorted(all_parts.items(), key=lambda x: x[1]['reference'])] @socketio.on('get_variants') def handle_get_variants(): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return all_parts = get_all_parts_from_schematic() emit('variants_data', { 'project_name': manager.project_name, 'variants': manager.get_variants(), 'active_variant': manager.get_active_variant(), 'all_parts': all_parts # Now includes uuid, reference, and value }) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('create_variant') def handle_create_variant(data): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return name = data.get('name', '') description = data.get('description', '') based_on = data.get('based_on', None) if not name: emit('variant_error', {'error': 'Variant name required'}) return success = manager.create_variant(name, description, based_on) if success: emit('variant_updated', {'message': f'Variant "{name}" created'}) else: emit('variant_error', {'error': f'Variant "{name}" already exists'}) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('delete_variant') def handle_delete_variant(data): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return name = data.get('name', '') success = manager.delete_variant(name) if success: emit('variant_updated', {'message': f'Variant "{name}" deleted'}) else: emit('variant_error', {'error': f'Cannot delete variant "{name}"'}) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('activate_variant') def handle_activate_variant(data): try: import pygetwindow as gw import pyautogui import time import win32gui import win32con manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return name = data.get('name', '') schematic_file = app_args.get('Schematic File', '') kicad_cli = app_args.get('Kicad Cli', 'kicad-cli') # Step 1: Save and close schematic window emit('variant_status', {'status': 'Looking for KiCad schematic window...'}) all_windows = gw.getAllTitles() windows = gw.getWindowsWithTitle('Schematic Editor') window_found = False if not windows: schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()] if schematic_windows: windows = gw.getWindowsWithTitle(schematic_windows[0]) window_found = len(windows) > 0 else: window_found = True # If window is found, save and close it if window_found: window = windows[0] emit('variant_status', {'status': f'Saving and closing: {window.title}'}) hwnd = window._hWnd rect = win32gui.GetWindowRect(hwnd) x, y, x2, y2 = rect width = x2 - x # Click to activate click_x = x + width // 2 click_y = y + 10 pyautogui.click(click_x, click_y) time.sleep(0.5) # Save pyautogui.hotkey('ctrl', 's') time.sleep(1.0) # Close win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0) time.sleep(1.0) # Try Alt+F4 if still open if win32gui.IsWindow(hwnd): try: rect = win32gui.GetWindowRect(hwnd) x, y, x2, y2 = rect click_x = x + (x2 - x) // 2 click_y = y + 10 pyautogui.click(click_x, click_y) time.sleep(0.3) except: pass pyautogui.hotkey('alt', 'F4') time.sleep(1.0) # Step 2: Sync current variant from schematic current_variant = manager.get_active_variant() emit('variant_status', {'status': f'Syncing current variant "{current_variant}"...'}) from sync_variant import sync_variant_from_schematic try: sync_success = sync_variant_from_schematic(schematic_file, current_variant) if sync_success: manager = get_variant_manager() else: print(f"Warning: Sync of variant '{current_variant}' failed") except Exception as e: print(f"Error during sync: {e}") # Step 3: Activate new variant and apply to schematic emit('variant_status', {'status': f'Activating variant "{name}"...'}) success = manager.set_active_variant(name) if success: apply_script_path = os.path.join(os.path.dirname(__file__), 'apply_variant.py') result = subprocess.run( [sys.executable, apply_script_path, schematic_file, name, kicad_cli], capture_output=True, text=True ) if result.returncode != 0: error_msg = result.stderr if result.stderr else result.stdout emit('variant_error', {'error': f'Failed to apply variant: {error_msg}'}) return else: emit('variant_error', {'error': f'Variant "{name}" not found'}) return # Step 4: Reopen schematic editor emit('variant_status', {'status': 'Waiting 2 seconds before reopening...'}) time.sleep(2.0) emit('variant_status', {'status': 'Reopening schematic editor...'}) kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin" if not os.path.exists(kicad_bin_dir): kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin" eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe") if os.path.exists(eeschema_exe): subprocess.Popen([eeschema_exe, schematic_file], shell=False) time.sleep(2.0) else: emit('variant_status', {'status': f'Warning: eeschema.exe not found at {eeschema_exe}'}) emit('variant_updated', {'message': f'Activated variant "{name}" and reopened schematic'}) except ImportError as e: emit('variant_error', {'error': f'Missing library: {str(e)}. Install: pip install pygetwindow pyautogui pywin32'}) except Exception as e: import traceback emit('variant_error', {'error': f'{str(e)}\n{traceback.format_exc()}'}) @socketio.on('get_variant_parts') def handle_get_variant_parts(data): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return variant_name = data.get('variant', '') all_parts = get_all_parts_from_schematic() dnp_uuids = manager.get_dnp_parts(variant_name) parts_data = [] for part in all_parts: parts_data.append({ 'uuid': part['uuid'], 'reference': part['reference'], 'value': part['value'], 'is_dnp': part['uuid'] in dnp_uuids }) emit('variant_parts_data', {'parts': parts_data}) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('set_part_dnp') def handle_set_part_dnp(data): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return variant = data.get('variant', '') uuid = data.get('uuid', '') is_dnp = data.get('is_dnp', False) success = manager.set_part_dnp(variant, uuid, is_dnp) if success: # Re-send updated parts list handle_get_variant_parts({'variant': variant}) else: emit('variant_error', {'error': 'Failed to update part'}) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('sync_from_schematic') def handle_sync_from_schematic(): try: manager = get_variant_manager() if not manager: emit('variant_error', {'error': 'No project loaded'}) return # Read DNP state from schematic and update active variant schematic_file = app_args.get('Schematic File', '') script_path = os.path.join(os.path.dirname(__file__), 'sync_variant.py') result = subprocess.run( [sys.executable, script_path, schematic_file], capture_output=True, text=True ) if result.returncode == 0: emit('sync_complete', {'message': f'Synced from schematic:\n{result.stdout}'}) else: emit('variant_error', {'error': f'Failed to sync: {result.stderr}'}) except Exception as e: emit('variant_error', {'error': str(e)}) @socketio.on('test_window_interaction') def handle_test_window_interaction(): try: import pygetwindow as gw import pyautogui import time import win32gui import win32con emit('window_test_status', {'status': 'Looking for KiCad schematic window...'}) # List all windows for debugging all_windows = gw.getAllTitles() emit('window_test_status', {'status': f'DEBUG: Found {len(all_windows)} windows total'}) # Find KiCad schematic editor window windows = gw.getWindowsWithTitle('Schematic Editor') emit('window_test_status', {'status': f'DEBUG: Found {len(windows)} windows with "Schematic Editor"'}) window_found = False if not windows: # Try alternative window title schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()] emit('window_test_status', {'status': f'DEBUG: Found {len(schematic_windows)} windows with "kicad" and "schematic"'}) if schematic_windows: emit('window_test_status', {'status': f'DEBUG: Using window: {schematic_windows[0]}'}) windows = gw.getWindowsWithTitle(schematic_windows[0]) window_found = len(windows) > 0 else: window_found = True # If window is found, close it if window_found: window = windows[0] emit('window_test_status', {'status': f'Found window: "{window.title}"'}) # Get window position and size hwnd = window._hWnd rect = win32gui.GetWindowRect(hwnd) x, y, x2, y2 = rect width = x2 - x height = y2 - y emit('window_test_status', {'status': f'DEBUG: Window position=({x},{y}), size=({width}x{height})'}) # Click on the window's title bar to activate it (more reliable than SetForegroundWindow) click_x = x + width // 2 click_y = y + 10 # Title bar is usually at the top emit('window_test_status', {'status': f'Clicking window at ({click_x}, {click_y}) to activate...'}) pyautogui.click(click_x, click_y) time.sleep(0.5) emit('window_test_status', {'status': 'Sending Ctrl+S (save)...'}) pyautogui.hotkey('ctrl', 's') time.sleep(1.0) emit('window_test_status', {'status': 'Attempting to close window...'}) # Method 1: Try WM_CLOSE message emit('window_test_status', {'status': 'DEBUG: Sending WM_CLOSE message...'}) win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0) time.sleep(1.0) # Check if window still exists if win32gui.IsWindow(hwnd): emit('window_test_status', {'status': 'DEBUG: Window still exists after WM_CLOSE, trying to click and send Alt+F4...'}) # Click on window again to make sure it has focus try: rect = win32gui.GetWindowRect(hwnd) x, y, x2, y2 = rect click_x = x + (x2 - x) // 2 click_y = y + 10 pyautogui.click(click_x, click_y) time.sleep(0.3) except: emit('window_test_status', {'status': 'DEBUG: Could not click window (may already be closed)'}) pyautogui.hotkey('alt', 'F4') time.sleep(1.0) # Final check if win32gui.IsWindow(hwnd): emit('window_test_status', {'status': 'DEBUG: Window still exists after Alt+F4 - may need manual intervention'}) else: emit('window_test_status', {'status': 'DEBUG: Window closed successfully via Alt+F4'}) else: emit('window_test_status', {'status': 'DEBUG: Window closed successfully via WM_CLOSE'}) else: emit('window_test_status', {'status': 'No KiCad schematic window found - will open it'}) # Wait a couple seconds before reopening emit('window_test_status', {'status': 'Waiting 2 seconds before reopening...'}) time.sleep(2.0) # Reopen the schematic editor schematic_file = app_args.get('Schematic File', '') if not schematic_file: emit('window_test_error', {'error': 'No schematic file specified in app arguments'}) return emit('window_test_status', {'status': f'Relaunching schematic editor with: {schematic_file}'}) # Launch KiCad schematic editor # The schematic editor executable is typically in the same directory as kicad.exe import os kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin" # Default KiCad 9 installation path if not os.path.exists(kicad_bin_dir): kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin" # Try KiCad 8 eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe") if not os.path.exists(eeschema_exe): emit('window_test_error', {'error': f'KiCad executable not found at: {eeschema_exe}'}) return emit('window_test_status', {'status': f'DEBUG: Launching {eeschema_exe} {schematic_file}'}) # Launch KiCad with the schematic file result = subprocess.Popen([eeschema_exe, schematic_file], shell=False) emit('window_test_status', {'status': f'DEBUG: Process started with PID {result.pid}'}) time.sleep(2.0) # Verify the window opened all_windows = gw.getAllTitles() schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()] if schematic_windows: emit('window_test_status', {'status': f'Successfully reopened schematic: {schematic_windows[0]}'}) else: emit('window_test_status', {'status': 'Schematic editor launched but window not detected yet'}) emit('window_test_complete', {'message': 'Window interaction test completed!'}) except ImportError as e: emit('window_test_error', {'error': f'Missing required library: {str(e)}. Please install: pip install pygetwindow pyautogui pywin32'}) except Exception as e: import traceback emit('window_test_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) # --------------------------------------------------------------------------- # Library Management # --------------------------------------------------------------------------- def get_db_connection(): """Get ODBC connection to the parts database""" try: # Use DSN connection as configured in KiCad database library conn_str = "DSN=UM_KiCad_Parts;" conn = pyodbc.connect(conn_str) return conn except Exception as e: print(f"Database connection error: {e}") return None @socketio.on('search_parts') def handle_search_parts(data): try: search_query = data.get('query', '').strip() conn = get_db_connection() if not conn: emit('library_error', {'error': 'Could not connect to database'}) return cursor = conn.cursor() # Build search query - search across ipn, mpn, manufacturer, description, symbol, and footprint # Column names in SQLite are lowercase if search_query: sql = """ SELECT ipn, mpn, manufacturer, description, datasheet FROM parts WHERE ipn LIKE ? OR mpn LIKE ? OR manufacturer LIKE ? OR description LIKE ? OR symbol LIKE ? OR footprint LIKE ? ORDER BY ipn LIMIT 100 """ search_param = f'%{search_query}%' cursor.execute(sql, (search_param, search_param, search_param, search_param, search_param, search_param)) else: # No search query - return first 100 parts sql = """ SELECT ipn, mpn, manufacturer, description, datasheet FROM parts ORDER BY ipn LIMIT 100 """ cursor.execute(sql) rows = cursor.fetchall() parts = [] for row in rows: parts.append({ 'ipn': row[0] if row[0] else '', 'mpn': row[1] if row[1] else '', 'manufacturer': row[2] if row[2] else '', 'description': row[3] if row[3] else '', 'datasheet': row[4] if row[4] else '' }) cursor.close() conn.close() emit('library_search_results', {'parts': parts, 'count': len(parts)}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('get_missing_ipns') def handle_get_missing_ipns(): try: # Load config to get spreadsheet path config_path = os.path.join(os.path.dirname(__file__), 'config.json') with open(config_path, 'r') as f: config = json.load(f) spreadsheet_path = config.get('parts_spreadsheet_path', '') if not spreadsheet_path or not os.path.exists(spreadsheet_path): emit('library_error', {'error': 'Parts spreadsheet not found'}) return # Get all IPNs from database conn = get_db_connection() if not conn: emit('library_error', {'error': 'Could not connect to database'}) return cursor = conn.cursor() cursor.execute("SELECT ipn FROM parts") db_ipns = set(row.ipn for row in cursor.fetchall() if row.ipn) cursor.close() conn.close() # Read spreadsheet import openpyxl wb = openpyxl.load_workbook(spreadsheet_path, read_only=True, data_only=True) ws = wb.active # Find header row and column indices header_row = None for row in ws.iter_rows(min_row=1, max_row=10, values_only=True): if row and 'GLE P/N' in row: header_row = row break if not header_row: emit('library_error', {'error': 'Could not find header row in spreadsheet'}) return ipn_col = header_row.index('GLE P/N') desc_col = header_row.index('Description') if 'Description' in header_row else None class_col = header_row.index('Class') if 'Class' in header_row else None mfg_col = header_row.index('Mfg.1') if 'Mfg.1' in header_row else None mpn_col = header_row.index('Mfg.1 P/N') if 'Mfg.1 P/N' in header_row else None # Collect missing IPNs missing_parts = [] for row in ws.iter_rows(min_row=2, values_only=True): if not row or not row[ipn_col]: continue ipn = str(row[ipn_col]).strip() if not ipn or ipn in db_ipns: continue # Get manufacturer and MPN manufacturer = str(row[mfg_col]).strip() if mfg_col and row[mfg_col] else '' mpn = str(row[mpn_col]).strip() if mpn_col and row[mpn_col] else '' # Skip section headers - rows with IPN and description but no manufacturer or MPN if not manufacturer and not mpn: continue # Get and normalize class field class_value = str(row[class_col]).strip() if class_col and row[class_col] else '' if class_value: # Replace spaces and special characters with underscore, collapse multiple underscores import re class_value = re.sub(r'[^a-zA-Z0-9]+', '_', class_value.upper()) class_value = re.sub(r'_+', '_', class_value).strip('_') part = { 'ipn': ipn, 'description': str(row[desc_col]).strip() if desc_col and row[desc_col] else '', 'class': class_value, 'manufacturer': manufacturer, 'mpn': mpn } missing_parts.append(part) wb.close() # Sort by IPN missing_parts.sort(key=lambda x: x['ipn']) emit('missing_ipns_result', {'parts': missing_parts, 'count': len(missing_parts)}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('create_part') def handle_create_part(data): try: ipn = data.get('ipn', '').strip() manufacturer = data.get('manufacturer', '').strip() mpn = data.get('mpn', '').strip() description = data.get('description', '').strip() class_value = data.get('class', '').strip() datasheet = data.get('datasheet', '').strip() symbol = data.get('symbol', '').strip() footprint = data.get('footprint', '').strip() if not ipn: emit('library_error', {'error': 'IPN is required'}) return conn = get_db_connection() if not conn: emit('library_error', {'error': 'Could not connect to database'}) return cursor = conn.cursor() # Check if part already exists cursor.execute("SELECT ipn FROM parts WHERE ipn = ?", (ipn,)) if cursor.fetchone(): emit('library_error', {'error': f'Part {ipn} already exists in database'}) cursor.close() conn.close() return # Insert new part cursor.execute(""" INSERT INTO parts (ipn, manufacturer, mpn, description, class, datasheet, symbol, footprint) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (ipn, manufacturer, mpn, description, class_value, datasheet, symbol, footprint)) conn.commit() cursor.close() conn.close() emit('part_created', {'message': f'Successfully created part {ipn}'}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) def resolve_library_path(lib_ref, lib_type='symbol'): """Resolve library reference to full path using KiCad library tables. Args: lib_ref: Library reference in format LibraryName:ItemName lib_type: 'symbol' or 'footprint' Returns: tuple: (library_path, item_name, library_name) or (None, None, None) if not found """ try: if ':' not in lib_ref: return (None, None, None) lib_name, item_name = lib_ref.split(':', 1) # Get KiCad config directory kicad_config_dir = os.path.expanduser(os.path.join('~', 'AppData', 'Roaming', 'kicad', '9.0')) if lib_type == 'symbol': table_file = os.path.join(kicad_config_dir, 'sym-lib-table') else: table_file = os.path.join(kicad_config_dir, 'fp-lib-table') if not os.path.exists(table_file): return (None, None, None) # Parse the library table file with open(table_file, 'r', encoding='utf-8') as f: content = f.read() # Simple s-expression parsing to find the library entry # Handle both single-line and multi-line formats # Pattern matches (lib ... (name "X") ... (uri "Y") ... ) import re # Use DOTALL flag to match across newlines pattern = r'\(lib\s+.*?\(name\s+"([^"]+)"\).*?\(uri\s+"([^"]+)"\).*?\)' matches = re.findall(pattern, content, re.DOTALL) for match_name, match_uri in matches: if match_name == lib_name: # Resolve environment variables in the path lib_path = match_uri # Replace common KiCad environment variables env_vars = { '${KICAD9_SYMBOL_DIR}': os.environ.get('KICAD9_SYMBOL_DIR', ''), '${KICAD9_FOOTPRINT_DIR}': os.environ.get('KICAD9_FOOTPRINT_DIR', ''), '${UM_KICAD}': os.environ.get('UM_KICAD', '') } for var, val in env_vars.items(): if var in lib_path and val: lib_path = lib_path.replace(var, val) # Convert to absolute path with proper separators lib_path = os.path.normpath(lib_path.replace('/', os.sep)) if os.path.exists(lib_path): return (lib_path, item_name, lib_name) return (None, None, None) except Exception as e: print(f"Error resolving library path: {e}") return (None, None, None) @socketio.on('get_part_details') def handle_get_part_details(data): try: ipn = data.get('ipn', '').strip() if not ipn: emit('library_error', {'error': 'IPN is required'}) return conn = get_db_connection() if not conn: emit('library_error', {'error': 'Could not connect to database'}) return cursor = conn.cursor() cursor.execute(""" SELECT ipn, manufacturer, mpn, description, class, datasheet, symbol, footprint FROM parts WHERE ipn = ? """, (ipn,)) row = cursor.fetchone() cursor.close() conn.close() if not row: emit('library_error', {'error': f'Part {ipn} not found'}) return part = { 'ipn': row[0], 'manufacturer': row[1] or '', 'mpn': row[2] or '', 'description': row[3] or '', 'class': row[4] or '', 'datasheet': row[5] or '', 'symbol': row[6] or '', 'footprint': row[7] or '' } # Resolve symbol and footprint library paths symbol_info = None footprint_info = None if part['symbol']: symbol_path, symbol_name, symbol_lib = resolve_library_path(part['symbol'], 'symbol') if symbol_path and symbol_name: symbol_info = { 'library_path': symbol_path, 'symbol_name': symbol_name, 'library_name': symbol_lib } if part['footprint']: footprint_path, footprint_name, footprint_lib = resolve_library_path(part['footprint'], 'footprint') if footprint_path and footprint_name: footprint_info = { 'library_path': footprint_path, 'footprint_name': footprint_name, 'library_name': footprint_lib } emit('part_details_result', { 'part': part, 'symbol_info': symbol_info, 'footprint_info': footprint_info }) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('update_part') def handle_update_part(data): try: ipn = data.get('ipn', '').strip() manufacturer = data.get('manufacturer', '').strip() mpn = data.get('mpn', '').strip() description = data.get('description', '').strip() class_value = data.get('class', '').strip() datasheet = data.get('datasheet', '').strip() symbol = data.get('symbol', '').strip() footprint = data.get('footprint', '').strip() if not ipn: emit('library_error', {'error': 'IPN is required'}) return conn = get_db_connection() if not conn: emit('library_error', {'error': 'Could not connect to database'}) return cursor = conn.cursor() # Check if part exists cursor.execute("SELECT ipn FROM parts WHERE ipn = ?", (ipn,)) if not cursor.fetchone(): emit('library_error', {'error': f'Part {ipn} not found in database'}) cursor.close() conn.close() return # Update part cursor.execute(""" UPDATE parts SET manufacturer = ?, mpn = ?, description = ?, class = ?, datasheet = ?, symbol = ?, footprint = ? WHERE ipn = ? """, (manufacturer, mpn, description, class_value, datasheet, symbol, footprint, ipn)) conn.commit() cursor.close() conn.close() emit('part_updated', {'message': f'Successfully updated part {ipn}'}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('get_symbol_libraries') def handle_get_symbol_libraries(): try: libraries = get_symbol_libraries() emit('symbol_libraries_result', {'libraries': libraries}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('get_symbols_in_library') def handle_get_symbols_in_library(data): try: library_path = data.get('library_path', '') if not library_path or not os.path.exists(library_path): emit('library_error', {'error': 'Invalid library path'}) return symbols = parse_kicad_symbol_file(library_path) emit('symbols_in_library_result', {'symbols': symbols}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('render_symbol') def handle_render_symbol(data): try: library_path = data.get('library_path', '') symbol_name = data.get('symbol_name', '') if not library_path or not symbol_name: emit('library_error', {'error': 'Missing library path or symbol name'}) return if not os.path.exists(library_path): emit('library_error', {'error': 'Library file not found'}) return symbol_block = extract_symbol_graphics(library_path, symbol_name) svg = render_symbol_to_svg(symbol_block) if svg: emit('symbol_render_result', {'svg': svg, 'symbol_name': symbol_name}) else: emit('library_error', {'error': f'Could not render symbol {symbol_name}'}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('get_footprint_libraries') def handle_get_footprint_libraries(): try: libraries = get_footprint_libraries() emit('footprint_libraries_result', {'libraries': libraries}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('get_footprints_in_library') def handle_get_footprints_in_library(data): try: library_path = data.get('library_path', '') if not library_path or not os.path.exists(library_path): emit('library_error', {'error': 'Invalid library path'}) return footprints = get_footprints_in_library(library_path) emit('footprints_in_library_result', {'footprints': footprints}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('render_footprint') def handle_render_footprint(data): try: library_path = data.get('library_path', '') footprint_name = data.get('footprint_name', '') if not library_path or not footprint_name: emit('library_error', {'error': 'Missing library path or footprint name'}) return if not os.path.exists(library_path): emit('library_error', {'error': 'Library directory not found'}) return footprint_file = os.path.join(library_path, f'{footprint_name}.kicad_mod') if not os.path.exists(footprint_file): emit('library_error', {'error': f'Footprint file not found: {footprint_name}.kicad_mod'}) return svg = render_footprint_to_svg(footprint_file) if svg: emit('footprint_render_result', {'svg': svg, 'footprint_name': footprint_name}) else: emit('library_error', {'error': f'Could not render footprint {footprint_name}'}) except Exception as e: import traceback emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) # --------------------------------------------------------------------------- # BOM Generation # --------------------------------------------------------------------------- @socketio.on('generate_bom') def handle_generate_bom(): try: manager = get_variant_manager() if not manager: emit('bom_error', {'error': 'No project loaded'}) return schematic_file = app_args.get('Schematic File', '') board_file = app_args.get('Board File', '') project_name = app_args.get('Project Name', 'project') project_dir = app_args.get('Project Dir', '') if not schematic_file: emit('bom_error', {'error': 'No schematic file specified'}) return # Get active variant and DNP parts active_variant = manager.get_active_variant() dnp_uuids = manager.get_dnp_parts(active_variant) emit('bom_status', {'status': 'Generating BOMs...'}) # Call BOM generator script bom_script_path = os.path.join(os.path.dirname(__file__), 'bom_generator.py') # Build command with optional PCB file cmd = [sys.executable, bom_script_path, schematic_file, project_name, active_variant, json.dumps(dnp_uuids)] if board_file: cmd.append(board_file) result = subprocess.run( cmd, capture_output=True, text=True ) if result.returncode == 0: # Create ZIP of all BOMs output_dir = project_dir if project_dir else os.path.dirname(schematic_file) base_name = f"{project_name}_{active_variant}" zip_filename = f"{base_name}_BOMs.zip" zip_path = os.path.join(output_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w') as zipf: # Add all BOM files bom_files = [ f"{base_name}_BOM.xlsx", f"{base_name}_Not_Populated.csv", f"{base_name}_BOM_Top.xlsx", f"{base_name}_BOM_Bottom.xlsx" ] for bom_file in bom_files: full_path = os.path.join(output_dir, bom_file) if os.path.exists(full_path): zipf.write(full_path, bom_file) emit('bom_complete', {'path': zip_path, 'filename': zip_filename}) else: emit('bom_error', {'error': f'BOM generation failed: {result.stderr}'}) except Exception as e: import traceback emit('bom_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @socketio.on('build_all') def handle_build_all(): """Generate all manufacturing outputs: PDFs, Gerbers, Drill, ODB++, STEP, BOMs""" try: kicad_cli = app_args.get('Kicad Cli', '') sch_file = app_args.get('Schematic File', '') board_file = app_args.get('Board File', '') project_dir = app_args.get('Project Dir', '') project_name = app_args.get('Project Name', 'project') variant = app_args.get('Variant', 'default') if not kicad_cli or not sch_file or not board_file: emit('build_error', {'error': 'Schematic or board file not configured'}) return if not os.path.exists(sch_file): emit('build_error', {'error': f'Schematic file not found: {sch_file}'}) return if not os.path.exists(board_file): emit('build_error', {'error': f'Board file not found: {board_file}'}) return # Create output directory timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') output_dir = os.path.join(project_dir, f'manufacturing_{timestamp}') os.makedirs(output_dir, exist_ok=True) # ===== STEP 1: Generate Schematic PDF ===== emit('build_progress', {'percent': 5, 'status': 'Generating schematic PDF...', 'message': 'Starting schematic PDF generation'}) schematics_dir = os.path.join(output_dir, 'schematics') os.makedirs(schematics_dir, exist_ok=True) sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf') cmd = [kicad_cli, 'sch', 'export', 'pdf', sch_file, '-o', sch_pdf_path] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_error', {'error': f'Schematic PDF generation failed: {result.stderr}'}) return # ===== STEP 2: Generate Board Layer PDFs ===== emit('build_progress', {'percent': 10, 'status': 'Generating board layer PDFs...', 'message': 'Schematic PDF complete'}) board_dir = os.path.join(output_dir, 'board') os.makedirs(board_dir, exist_ok=True) temp_pdf_dir = os.path.join(output_dir, 'temp_pdfs') os.makedirs(temp_pdf_dir, exist_ok=True) # All layers to export layers = [ ('F.Cu', 'Top_Copper'), ('B.Cu', 'Bottom_Copper'), ('F.Silkscreen', 'Top_Silkscreen'), ('B.Silkscreen', 'Bottom_Silkscreen'), ('F.Mask', 'Top_Soldermask'), ('B.Mask', 'Bottom_Soldermask'), ('F.Paste', 'Top_Paste'), ('B.Paste', 'Bottom_Paste'), ('Edge.Cuts', 'Board_Outline'), ('F.Fab', 'Top_Fabrication'), ('B.Fab', 'Bottom_Fabrication'), ] pdf_files = [] for layer_name, file_suffix in layers: pdf_path_temp = os.path.join(temp_pdf_dir, f'{file_suffix}_temp.pdf') pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf') # Include Edge.Cuts on every layer except the Edge.Cuts layer itself if layer_name == 'Edge.Cuts': layers_to_export = layer_name else: layers_to_export = f"{layer_name},Edge.Cuts" cmd = [ kicad_cli, 'pcb', 'export', 'pdf', board_file, '-l', layers_to_export, '--include-border-title', '-o', pdf_path_temp ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: # Add layer name text overlay layer_text = f"Layer: {file_suffix.replace('_', ' ')}" add_text_overlay_to_pdf(pdf_path_temp, pdf_path, layer_text) pdf_files.append(pdf_path) # Merge all PDFs into one if pdf_files: emit('build_progress', {'percent': 25, 'status': 'Merging board layer PDFs...', 'message': f'Generated {len(pdf_files)} layer PDFs'}) merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf') merger = PdfMerger() for pdf in pdf_files: merger.append(pdf) merger.write(merged_pdf_path) merger.close() # Delete temp PDF directory shutil.rmtree(temp_pdf_dir) # ===== STEP 3: Generate Gerbers ===== emit('build_progress', {'percent': 35, 'status': 'Generating Gerbers...', 'message': 'Board PDFs complete, starting Gerber generation'}) gerber_dir = os.path.join(output_dir, 'gerbers') os.makedirs(gerber_dir, exist_ok=True) cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_error', {'error': f'Gerber generation failed: {result.stderr}'}) return # ===== STEP 4: Generate Drill Files ===== emit('build_progress', {'percent': 50, 'status': 'Generating drill files...', 'message': 'Gerbers complete'}) cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_progress', {'percent': 55, 'status': 'Drill file warning', 'message': f'Drill generation had issues: {result.stderr}'}) # ===== STEP 5: Generate ODB++ ===== emit('build_progress', {'percent': 60, 'status': 'Generating ODB++...', 'message': 'Drill files complete'}) odb_dir = os.path.join(output_dir, 'odb') os.makedirs(odb_dir, exist_ok=True) odb_file = os.path.join(odb_dir, f'{project_name}.zip') cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_progress', {'percent': 65, 'status': 'ODB++ generation skipped', 'message': 'ODB++ not available, continuing...'}) else: emit('build_progress', {'percent': 65, 'status': 'ODB++ complete', 'message': 'ODB++ generation successful'}) # ===== STEP 6: Export STEP ===== emit('build_progress', {'percent': 70, 'status': 'Exporting STEP model...', 'message': 'Starting 3D model export'}) step_file = os.path.join(output_dir, f'{project_name}.step') cmd = [kicad_cli, 'pcb', 'export', 'step', board_file, '-o', step_file, '--subst-models'] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_progress', {'percent': 75, 'status': 'STEP export failed', 'message': f'STEP export error: {result.stderr}'}) else: emit('build_progress', {'percent': 75, 'status': 'STEP export complete', 'message': 'STEP model generated'}) # ===== STEP 7: Generate BOMs ===== emit('build_progress', {'percent': 80, 'status': 'Generating BOMs...', 'message': 'Starting BOM generation'}) bom_script = os.path.join(os.path.dirname(__file__), 'bom_generator.py') # BOM generator expects: schematic_file, project_name, variant_name, [dnp_uuids_json], [pcb_file] cmd = [sys.executable, bom_script, sch_file, project_name, variant, '[]'] if board_file: cmd.append(board_file) result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: emit('build_progress', {'percent': 90, 'status': 'BOM generation failed', 'message': f'BOM error: {result.stderr}'}) else: # Move BOM files from schematic directory to output directory bom_base_name = f'{project_name}_{variant}' bom_files = [ f'{bom_base_name}_BOM.xlsx', f'{bom_base_name}_Not_Populated.csv', f'{bom_base_name}_BOM_Top.xlsx', f'{bom_base_name}_BOM_Bottom.xlsx' ] for bom_file in bom_files: src_path = os.path.join(project_dir, bom_file) if os.path.exists(src_path): dst_path = os.path.join(output_dir, bom_file) shutil.move(src_path, dst_path) emit('build_progress', {'percent': 90, 'status': 'BOMs complete', 'message': 'BOM files generated'}) # ===== STEP 8: Create ZIP Package ===== emit('build_progress', {'percent': 95, 'status': 'Packaging outputs...', 'message': 'Creating ZIP archive'}) zip_filename = f'{project_name}_manufacturing_{timestamp}.zip' zip_path = os.path.join(project_dir, zip_filename) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: # Add all files from output directory for root, dirs, files in os.walk(output_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, output_dir) zipf.write(file_path, arcname) emit('build_progress', {'percent': 100, 'status': 'Build complete!', 'message': f'Package ready: {zip_filename}'}) emit('build_complete', {'download_url': f'/download_build/{zip_filename}', 'filename': zip_filename}) except Exception as e: import traceback emit('build_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'}) @app.route('/download_build/') def download_build(filename): """Serve the manufacturing output ZIP file""" sch_file = app_args.get('Schematic File', '') if not sch_file: return "Configuration error", 500 project_dir = os.path.dirname(sch_file) file_path = os.path.join(project_dir, filename) if not os.path.exists(file_path): return "File not found", 404 return send_file(file_path, as_attachment=True, download_name=filename) def shutdown_server(): print("Server stopped") os._exit(0) def parse_args(args): """Parse command line arguments into a dictionary""" parsed = {'executable': args[0] if args else ''} i = 1 while i < len(args): if args[i].startswith('--'): key = args[i][2:].replace('-', ' ').title() if i + 1 < len(args) and not args[i + 1].startswith('--'): parsed[key] = args[i + 1] i += 2 else: parsed[key] = 'true' i += 1 else: i += 1 return parsed if __name__ == '__main__': # Load configuration load_config() # Parse arguments app_args = parse_args(sys.argv) # Open browser after short delay def open_browser(): time.sleep(1.5) webbrowser.open('http://127.0.0.1:5000') threading.Thread(target=open_browser, daemon=True).start() # Run the app print("Starting Flask app...") socketio.run(app, debug=False, host='127.0.0.1', port=5000)