Files
kicad-manager/app.py
brentperteet adffdd211c Add 3D model extraction and download from embedded footprint models
- Parse embedded_files sections in KiCad footprint files
- Extract and decompress (zstd) embedded STEP models
- Add backend endpoint to serve 3D models
- Add UI section to display and download 3D models
- Include Three.js library for future interactive viewing
- Provide download link for extracted STEP files
- Note: Interactive 3D viewing requires STEP to STL/OBJ conversion (future enhancement)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2026-02-27 13:18:34 -06:00

2521 lines
95 KiB
Python

import sys
import webbrowser
import threading
import subprocess
import os
import zipfile
import tempfile
import shutil
import json
import re
import math
from pathlib import Path
from datetime import datetime
from flask import Flask, render_template, request, send_file, jsonify
from flask_socketio import SocketIO, emit
import time
from PyPDF2 import PdfMerger, PdfReader, PdfWriter
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
from variant_manager import VariantManager
import pyodbc
from io import BytesIO
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app,
cors_allowed_origins="*",
ping_timeout=120, # 2 minutes timeout
ping_interval=25) # Send ping every 25 seconds
# Store arguments
app_args = {}
connected_clients = set()
heartbeat_timeout = 5 # seconds
shutdown_timer = None # Track shutdown timer to cancel if client reconnects
# Configuration
config_file = 'config.json'
app_config = {}
def load_config():
"""Load configuration from file"""
global app_config
if os.path.exists(config_file):
with open(config_file, 'r') as f:
app_config = json.load(f)
else:
app_config = {
'parts_spreadsheet_path': ''
}
return app_config
def save_config():
"""Save configuration to file"""
with open(config_file, 'w') as f:
json.dump(app_config, f, indent=2)
def add_text_overlay_to_pdf(input_pdf_path, output_pdf_path, text, x=50, y=750, font_size=14):
"""Add text overlay to upper left corner of PDF"""
# Read the existing PDF
reader = PdfReader(input_pdf_path)
writer = PdfWriter()
# Create text overlay
packet = BytesIO()
can = canvas.Canvas(packet, pagesize=letter)
can.setFont("Helvetica-Bold", font_size)
can.drawString(x, y, text)
can.save()
# Move to the beginning of the BytesIO buffer
packet.seek(0)
overlay_pdf = PdfReader(packet)
# Merge overlay with each page
for page in reader.pages:
page.merge_page(overlay_pdf.pages[0])
writer.add_page(page)
# Write output
with open(output_pdf_path, 'wb') as output_file:
writer.write(output_file)
# ---------------------------------------------------------------------------
# KiCad Library Functions
# ---------------------------------------------------------------------------
def get_kicad_lib_path():
"""Get the KiCad library path from UM_KICAD environment variable."""
um_kicad = os.environ.get('UM_KICAD')
if not um_kicad:
return None
return os.path.join(um_kicad, 'lib')
def get_symbol_libraries():
"""Get list of all symbol library files."""
lib_path = get_kicad_lib_path()
if not lib_path:
return []
symbols_dir = os.path.join(lib_path, 'symbols')
if not os.path.exists(symbols_dir):
return []
libraries = []
for filename in os.listdir(symbols_dir):
if filename.endswith('.kicad_sym'):
# Remove .kicad_sym extension for display
display_name = filename[:-10] if filename.endswith('.kicad_sym') else filename
libraries.append({
'name': display_name,
'path': os.path.join(symbols_dir, filename)
})
return sorted(libraries, key=lambda x: x['name'])
def get_footprint_libraries():
"""Get list of all footprint libraries (.pretty directories)."""
lib_path = get_kicad_lib_path()
if not lib_path:
return []
footprints_dir = os.path.join(lib_path, 'footprints')
if not os.path.exists(footprints_dir):
return []
libraries = []
for item in os.listdir(footprints_dir):
item_path = os.path.join(footprints_dir, item)
if os.path.isdir(item_path) and item.endswith('.pretty'):
# Remove .pretty extension for display
display_name = item[:-7] if item.endswith('.pretty') else item
libraries.append({
'name': display_name,
'path': item_path
})
return sorted(libraries, key=lambda x: x['name'])
def parse_kicad_symbol_file(file_path):
"""Parse a KiCad symbol library file and extract symbol names."""
symbols = []
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Find all symbols (top-level only, not sub-symbols)
symbol_pattern = r'^\t\(symbol\s+"([^"]+)"'
symbol_names = re.findall(symbol_pattern, content, re.MULTILINE)
# Filter out sub-symbols (those with _X_Y suffix)
for symbol_name in symbol_names:
if not re.match(r'.*_\d+_\d+$', symbol_name):
symbols.append(symbol_name)
except Exception as e:
print(f"Error parsing {file_path}: {e}")
return sorted(symbols)
def get_footprints_in_library(library_path):
"""Get list of all footprints in a .pretty library."""
footprints = []
try:
for filename in os.listdir(library_path):
if filename.endswith('.kicad_mod'):
footprint_name = filename[:-10] # Remove .kicad_mod extension
footprints.append(footprint_name)
except Exception as e:
print(f"Error reading footprints from {library_path}: {e}")
return sorted(footprints)
def extract_embedded_model(footprint_path, footprint_name):
"""Extract embedded 3D model from a KiCad footprint file.
Returns:
dict with 'name', 'data' (base64), 'type' if found, None otherwise
"""
try:
footprint_file = os.path.join(footprint_path, f"{footprint_name}.kicad_mod")
if not os.path.exists(footprint_file):
return None
with open(footprint_file, 'r', encoding='utf-8') as f:
content = f.read()
# Look for embedded_files section
embedded_pattern = r'\(embedded_files\s+(.*?)\n\t\)\s*\n\t\(model'
match = re.search(embedded_pattern, content, re.DOTALL)
if not match:
return None
embedded_section = match.group(1)
# Extract file name
name_match = re.search(r'\(name\s+"([^"]+)"\)', embedded_section)
if not name_match:
return None
model_name = name_match.group(1)
# Extract type
type_match = re.search(r'\(type\s+(\w+)\)', embedded_section)
model_type = type_match.group(1) if type_match else 'model'
# Extract base64 data (multiline, starts with |)
data_match = re.search(r'\(data\s+\|(.*?)\n\t\t\t\)', embedded_section, re.DOTALL)
if not data_match:
return None
# Clean up the base64 data (remove whitespace and newlines)
base64_data = re.sub(r'\s+', '', data_match.group(1))
return {
'name': model_name,
'type': model_type,
'data': base64_data
}
except Exception as e:
print(f"Error extracting embedded model: {e}")
return None
def extract_symbol_graphics(file_path, symbol_name):
"""Extract graphical elements from a symbol for rendering."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Find the symbol definition
symbol_pattern = re.compile(r'^\t\(symbol\s+"' + re.escape(symbol_name) + r'"\s*$', re.MULTILINE)
match = symbol_pattern.search(content)
if not match:
return None
symbol_start = match.start()
# Find the end of this symbol
next_symbol_pattern = re.compile(r'^\t\(symbol\s+"[^"]+"', re.MULTILINE)
next_match = next_symbol_pattern.search(content, symbol_start + len(symbol_name) + 10)
if next_match:
symbol_block = content[symbol_start:next_match.start()]
else:
end_pattern = re.compile(r'^\)', re.MULTILINE)
end_match = end_pattern.search(content, symbol_start + 1)
if end_match:
symbol_block = content[symbol_start:end_match.start()]
else:
symbol_block = content[symbol_start:]
# Filter out alternate De Morgan representations
symbol_block = re.sub(
r'^\t\t\(symbol\s+"' + re.escape(symbol_name) + r'_(\d+)_([2-9])".*?^\t\t\)',
'',
symbol_block,
flags=re.MULTILINE | re.DOTALL
)
return symbol_block
except Exception as e:
print(f"Error extracting graphics for {symbol_name}: {e}")
return None
def render_symbol_to_svg(symbol_block):
"""Convert symbol graphics to SVG."""
if not symbol_block:
return None
svg_elements = []
min_x, min_y, max_x, max_y = 0, 0, 0, 0
# Parse polylines
for polyline in re.finditer(r'\(polyline\n(.*?)\n\t+\)', symbol_block, re.DOTALL):
polyline_text = polyline.group(0)
points = []
# Extract points
pts_match = re.search(r'\(pts\s+(.*?)\n\s*\)', polyline_text, re.DOTALL)
if pts_match:
pts_content = pts_match.group(1)
xy_pattern = r'\(xy\s+([-\d.]+)\s+([-\d.]+)\)'
for match in re.finditer(xy_pattern, pts_content):
x, y = float(match.group(1)), -float(match.group(2))
points.append((x, y))
if points:
stroke_width = 0.254
stroke_match = re.search(r'\(width\s+([-\d.]+)\)', polyline_text)
if stroke_match:
stroke_width = float(stroke_match.group(1))
fill_type = 'none'
fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', polyline_text)
if fill_match:
fill_type = fill_match.group(1)
path_d = f"M {points[0][0]} {points[0][1]}"
for x, y in points[1:]:
path_d += f" L {x} {y}"
fill_color = 'none' if fill_type == 'none' else '#FFFFCC'
if fill_type == 'outline':
path_d += ' Z'
fill_color = '#FFFFCC'
svg_elements.append(f'<path d="{path_d}" stroke="#CC0000" stroke-width="{stroke_width}" fill="{fill_color}"/>')
for x, y in points:
min_x, max_x = min(min_x, x), max(max_x, x)
min_y, max_y = min(min_y, y), max(max_y, y)
# Parse circles
for circle in re.finditer(r'\(circle\n(.*?)\n\t+\)', symbol_block, re.DOTALL):
circle_text = circle.group(0)
center_match = re.search(r'\(center\s+([-\d.]+)\s+([-\d.]+)\)', circle_text)
radius_match = re.search(r'\(radius\s+([-\d.]+)\)', circle_text)
if center_match and radius_match:
cx, cy = float(center_match.group(1)), -float(center_match.group(2))
radius = float(radius_match.group(1))
stroke_width = 0.254
stroke_match = re.search(r'\(width\s+([-\d.]+)\)', circle_text)
if stroke_match:
stroke_width = float(stroke_match.group(1))
fill_type = 'none'
fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', circle_text)
if fill_match:
fill_type = fill_match.group(1)
fill_color = 'none' if fill_type == 'none' else '#FFFFCC'
svg_elements.append(f'<circle cx="{cx}" cy="{cy}" r="{radius}" stroke="#CC0000" stroke-width="{stroke_width}" fill="{fill_color}"/>')
min_x, max_x = min(min_x, cx - radius), max(max_x, cx + radius)
min_y, max_y = min(min_y, cy - radius), max(max_y, cy + radius)
# Parse rectangles
for rect in re.finditer(r'(\t+)\(rectangle\n.*?\n\1\)', symbol_block, re.DOTALL):
rect_text = rect.group(0)
start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', rect_text)
end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', rect_text)
if start_match and end_match:
x1, y1 = float(start_match.group(1)), -float(start_match.group(2))
x2, y2 = float(end_match.group(1)), -float(end_match.group(2))
width = abs(x2 - x1)
height = abs(y2 - y1)
x = min(x1, x2)
y = min(y1, y2)
stroke_width = 0.254
stroke_match = re.search(r'\(width\s+([-\d.]+)\)', rect_text)
if stroke_match:
stroke_width = max(float(stroke_match.group(1)), 0.1)
fill_type = 'none'
fill_match = re.search(r'\(fill\s+\(type\s+(\w+)\)', rect_text)
if fill_match:
fill_type = fill_match.group(1)
fill_color = 'none' if fill_type == 'none' else '#FFFFCC'
svg_elements.append(f'<rect x="{x}" y="{y}" width="{width}" height="{height}" stroke="#CC0000" stroke-width="{stroke_width}" fill="{fill_color}"/>')
min_x, max_x = min(min_x, x), max(max_x, x + width)
min_y, max_y = min(min_y, y), max(max_y, y + height)
# Parse text elements
for text in re.finditer(r'\(text\s+"([^"]*)".*?\n\t+\)', symbol_block, re.DOTALL):
text_str = text.group(1)
text_block = text.group(0)
at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)', text_block)
if at_match and text_str:
x, y = float(at_match.group(1)), -float(at_match.group(2))
size = 1.27
size_match = re.search(r'\(size\s+([-\d.]+)', text_block)
if size_match:
size = float(size_match.group(1))
svg_elements.append(f'<text x="{x}" y="{y}" font-size="{size}" fill="#000080" font-family="Arial">{text_str}</text>')
# Parse pins
for pin in re.finditer(r'(\t+)\(pin\s+\w+\s+\w+\n.*?\n\1\)', symbol_block, re.DOTALL):
pin_text = pin.group(0)
at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)\s+([-\d.]+)\)', pin_text)
length_match = re.search(r'\(length\s+([-\d.]+)\)', pin_text)
number_match = re.search(r'\(number\s+"([^"]*)"', pin_text)
name_match = re.search(r'\(name\s+"([^"]*)"', pin_text)
if at_match and length_match:
x, y = float(at_match.group(1)), -float(at_match.group(2))
angle = float(at_match.group(3))
length = float(length_match.group(1))
number = number_match.group(1) if number_match else ""
name = name_match.group(1) if name_match else ""
# Filter out placeholder names
if name == "~":
name = ""
angle_rad = math.radians(angle)
x2 = x + length * math.cos(angle_rad)
y2 = y - length * math.sin(angle_rad)
svg_elements.append(f'<line x1="{x}" y1="{y}" x2="{x2}" y2="{y2}" stroke="#00CC00" stroke-width="0.254"/>')
svg_elements.append(f'<circle cx="{x2}" cy="{y2}" r="0.5" fill="#00CC00"/>')
# Pin number at the connection point (outside)
if number:
# Position number outside the pin with more offset
num_offset = 1.5
num_x = x - math.cos(angle_rad) * num_offset
num_y = y + math.sin(angle_rad) * num_offset
# Determine text anchor based on pin direction
anchor = "middle"
if angle == 0: # Right - number on left side
anchor = "end"
num_x = x - num_offset
elif angle == 180: # Left - number on right side
anchor = "start"
num_x = x + num_offset
elif angle == 90: # Down - number on top
anchor = "middle"
num_y = y - num_offset
elif angle == 270: # Up - number on bottom
anchor = "middle"
num_y = y + num_offset
svg_elements.append(f'<text x="{num_x}" y="{num_y}" font-size="1.8" fill="#006600" font-family="Arial" text-anchor="{anchor}" dominant-baseline="middle">{number}</text>')
# Pin name at the inner end (inside symbol)
if name:
# Position name inside the symbol with more offset from pin end
name_offset = 1.2
name_x = x2 - math.cos(angle_rad) * name_offset
name_y = y2 + math.sin(angle_rad) * name_offset
# Determine text anchor based on pin direction
anchor = "middle"
if angle == 0: # Right - name inside on right
anchor = "start"
name_x = x2 + name_offset
elif angle == 180: # Left - name inside on left
anchor = "end"
name_x = x2 - name_offset
elif angle == 90: # Down - name inside below
anchor = "middle"
name_y = y2 + name_offset
elif angle == 270: # Up - name inside above
anchor = "middle"
name_y = y2 - name_offset
svg_elements.append(f'<text x="{name_x}" y="{name_y}" font-size="1.6" fill="#003366" font-family="Arial" text-anchor="{anchor}" dominant-baseline="middle">{name}</text>')
min_x = min(min_x, x, x2)
max_x = max(max_x, x, x2)
min_y = min(min_y, y, y2)
max_y = max(max_y, y, y2)
if not svg_elements:
return '<svg xmlns="http://www.w3.org/2000/svg" width="200" height="100"><text x="100" y="50" text-anchor="middle" fill="#999">No preview</text></svg>'
# Add padding
padding = 5
min_x -= padding
min_y -= padding
max_x += padding
max_y += padding
width = max_x - min_x
height = max_y - min_y
scale = 10
svg = f'''<svg xmlns="http://www.w3.org/2000/svg" viewBox="{min_x} {min_y} {width} {height}" width="{width * scale}" height="{height * scale}">
<rect width="100%" height="100%" fill="white"/>
{''.join(svg_elements)}
</svg>'''
return svg
def render_footprint_to_svg(file_path):
"""Render a KiCad footprint to SVG."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
svg_elements = []
min_x, min_y, max_x, max_y = 0, 0, 0, 0
# Parse fp_line elements (silkscreen/fab lines)
for line in re.finditer(r'\(fp_line\s+(.*?)\n\t\)', content, re.DOTALL):
line_text = line.group(1)
start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', line_text)
end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', line_text)
width_match = re.search(r'\(width\s+([-\d.]+)\)', line_text)
layer_match = re.search(r'\(layer\s+"([^"]+)"\)', line_text)
if start_match and end_match:
x1, y1 = float(start_match.group(1)), float(start_match.group(2))
x2, y2 = float(end_match.group(1)), float(end_match.group(2))
width = float(width_match.group(1)) if width_match else 0.15
layer = layer_match.group(1) if layer_match else "F.SilkS"
# Color by layer
color = "#CC00CC" if "SilkS" in layer else "#888888"
svg_elements.append(f'<line x1="{x1}" y1="{y1}" x2="{x2}" y2="{y2}" stroke="{color}" stroke-width="{width}"/>')
min_x = min(min_x, x1, x2)
max_x = max(max_x, x1, x2)
min_y = min(min_y, y1, y2)
max_y = max(max_y, y1, y2)
# Parse fp_circle elements
for circle in re.finditer(r'\(fp_circle\s+(.*?)\n\t\)', content, re.DOTALL):
circle_text = circle.group(1)
center_match = re.search(r'\(center\s+([-\d.]+)\s+([-\d.]+)\)', circle_text)
end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', circle_text)
width_match = re.search(r'\(width\s+([-\d.]+)\)', circle_text)
layer_match = re.search(r'\(layer\s+"([^"]+)"\)', circle_text)
if center_match and end_match:
cx, cy = float(center_match.group(1)), float(center_match.group(2))
ex, ey = float(end_match.group(1)), float(end_match.group(2))
radius = ((ex - cx)**2 + (ey - cy)**2)**0.5
width = float(width_match.group(1)) if width_match else 0.15
layer = layer_match.group(1) if layer_match else "F.SilkS"
color = "#CC00CC" if "SilkS" in layer else "#888888"
svg_elements.append(f'<circle cx="{cx}" cy="{cy}" r="{radius}" stroke="{color}" stroke-width="{width}" fill="none"/>')
min_x = min(min_x, cx - radius)
max_x = max(max_x, cx + radius)
min_y = min(min_y, cy - radius)
max_y = max(max_y, cy + radius)
# Parse fp_rect elements
for rect in re.finditer(r'\(fp_rect\s+(.*?)\n\t\)', content, re.DOTALL):
rect_text = rect.group(1)
start_match = re.search(r'\(start\s+([-\d.]+)\s+([-\d.]+)\)', rect_text)
end_match = re.search(r'\(end\s+([-\d.]+)\s+([-\d.]+)\)', rect_text)
width_match = re.search(r'\(width\s+([-\d.]+)\)', rect_text)
layer_match = re.search(r'\(layer\s+"([^"]+)"\)', rect_text)
if start_match and end_match:
x1, y1 = float(start_match.group(1)), float(start_match.group(2))
x2, y2 = float(end_match.group(1)), float(end_match.group(2))
width = float(width_match.group(1)) if width_match else 0.15
layer = layer_match.group(1) if layer_match else "F.SilkS"
color = "#CC00CC" if "SilkS" in layer else "#888888"
w = abs(x2 - x1)
h = abs(y2 - y1)
x = min(x1, x2)
y = min(y1, y2)
svg_elements.append(f'<rect x="{x}" y="{y}" width="{w}" height="{h}" stroke="{color}" stroke-width="{width}" fill="none"/>')
min_x = min(min_x, x)
max_x = max(max_x, x + w)
min_y = min(min_y, y)
max_y = max(max_y, y + h)
# Parse pads
for pad in re.finditer(r'\(pad\s+"([^"]+)"\s+(\w+)\s+(\w+)\s+(.*?)\n\t\)', content, re.DOTALL):
pad_num = pad.group(1)
pad_type = pad.group(2) # smd, thru_hole, np_thru_hole
pad_shape = pad.group(3) # rect, circle, oval, roundrect
pad_params = pad.group(4)
at_match = re.search(r'\(at\s+([-\d.]+)\s+([-\d.]+)(?:\s+([-\d.]+))?\)', pad_params)
size_match = re.search(r'\(size\s+([-\d.]+)\s+([-\d.]+)\)', pad_params)
if at_match and size_match:
px, py = float(at_match.group(1)), float(at_match.group(2))
rotation = float(at_match.group(3)) if at_match.group(3) else 0
width, height = float(size_match.group(1)), float(size_match.group(2))
# Color based on pad type
pad_color = "#C87533" if pad_type == "smd" else "#FFD700"
# Create transform for rotation (only add if rotation is non-zero)
transform_attr = f' transform="rotate({rotation} {px} {py})"' if rotation != 0 else ''
if pad_shape == "rect":
x = px - width / 2
y = py - height / 2
svg_elements.append(f'<rect x="{x}" y="{y}" width="{width}" height="{height}" fill="{pad_color}" stroke="#8B4513" stroke-width="0.05"{transform_attr}/>')
elif pad_shape == "circle":
r = width / 2
svg_elements.append(f'<circle cx="{px}" cy="{py}" r="{r}" fill="{pad_color}" stroke="#8B4513" stroke-width="0.05"/>')
elif pad_shape == "oval":
rx = width / 2
ry = height / 2
svg_elements.append(f'<ellipse cx="{px}" cy="{py}" rx="{rx}" ry="{ry}" fill="{pad_color}" stroke="#8B4513" stroke-width="0.05"{transform_attr}/>')
elif pad_shape == "roundrect":
# Extract roundrect_rratio if available
rratio_match = re.search(r'\(roundrect_rratio\s+([-\d.]+)\)', pad_params)
rratio = float(rratio_match.group(1)) if rratio_match else 0.25
# Calculate corner radius based on the smaller dimension
corner_radius = min(width, height) * rratio
x = px - width / 2
y = py - height / 2
svg_elements.append(f'<rect x="{x}" y="{y}" width="{width}" height="{height}" rx="{corner_radius}" ry="{corner_radius}" fill="{pad_color}" stroke="#8B4513" stroke-width="0.05"{transform_attr}/>')
# Add pad number
svg_elements.append(f'<text x="{px}" y="{py}" font-size="0.8" fill="white" font-family="Arial" text-anchor="middle" dominant-baseline="middle">{pad_num}</text>')
min_x = min(min_x, px - width / 2)
max_x = max(max_x, px + width / 2)
min_y = min(min_y, py - height / 2)
max_y = max(max_y, py + height / 2)
if not svg_elements:
return '<svg xmlns="http://www.w3.org/2000/svg" width="200" height="100"><text x="100" y="50" text-anchor="middle" fill="#999">No preview</text></svg>'
# Add padding (as percentage of footprint size for better scaling)
width = max_x - min_x
height = max_y - min_y
padding = max(width, height) * 0.15 # 15% padding
min_x -= padding
min_y -= padding
max_x += padding
max_y += padding
width = max_x - min_x
height = max_y - min_y
# Don't set fixed width/height - let it scale to fill container while maintaining aspect ratio
svg = f'''<svg xmlns="http://www.w3.org/2000/svg" viewBox="{min_x} {min_y} {width} {height}" preserveAspectRatio="xMidYMid meet">
<rect width="100%" height="100%" fill="#2C2C2C"/>
{''.join(svg_elements)}
</svg>'''
return svg
except Exception as e:
print(f"Error rendering footprint: {e}")
return None
@app.route('/')
def index():
# Reconstruct the command line that invoked this app
cmd_parts = [sys.argv[0]]
for i in range(1, len(sys.argv)):
arg = sys.argv[i]
# Quote arguments with spaces
if ' ' in arg:
cmd_parts.append(f'"{arg}"')
else:
cmd_parts.append(arg)
invocation_cmd = ' '.join(cmd_parts)
# Check if called with no arguments (library-only mode)
library_only_mode = len(sys.argv) == 1 or not app_args
return render_template('index.html', args=app_args, invocation_cmd=invocation_cmd, library_only_mode=library_only_mode)
@socketio.on('connect')
def handle_connect():
global shutdown_timer
# Cancel shutdown timer if client reconnects
if shutdown_timer is not None:
print("Client reconnected, canceling shutdown")
shutdown_timer.cancel()
shutdown_timer = None
connected_clients.add(request.sid)
print(f"Client connected: {request.sid}")
@socketio.on('disconnect')
def handle_disconnect():
global shutdown_timer
connected_clients.discard(request.sid)
print(f"Client disconnected: {request.sid}")
# Shutdown if no clients connected after waiting for potential reconnect
if not connected_clients:
print("No clients connected. Waiting 2 seconds for reconnect before shutting down...")
shutdown_timer = threading.Timer(2.0, shutdown_server)
shutdown_timer.start()
@socketio.on('heartbeat')
def handle_heartbeat():
emit('heartbeat_ack')
@socketio.on('generate_pdf')
def handle_generate_pdf():
try:
kicad_cli = app_args.get('Kicad Cli', '')
schematic_file = app_args.get('Schematic File', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli:
emit('pdf_error', {'error': 'Missing kicad-cli argument'})
return
# Create temporary directory for PDFs
temp_dir = tempfile.mkdtemp()
schematics_dir = os.path.join(temp_dir, 'schematics')
board_dir = os.path.join(temp_dir, 'board')
os.makedirs(schematics_dir, exist_ok=True)
os.makedirs(board_dir, exist_ok=True)
# Generate schematic PDF
if schematic_file:
emit('pdf_status', {'status': 'Generating schematic PDF...'})
sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf')
cmd = [kicad_cli, 'sch', 'export', 'pdf', schematic_file, '-o', sch_pdf_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
shutil.rmtree(temp_dir)
emit('pdf_error', {'error': f'Schematic PDF failed: {result.stderr}'})
return
# Generate board layer PDFs - one per layer, then merge
if board_file:
emit('pdf_status', {'status': 'Generating board layer PDFs...'})
# All layers to export
layers = [
('F.Cu', 'Top_Copper'),
('B.Cu', 'Bottom_Copper'),
('F.Silkscreen', 'Top_Silkscreen'),
('B.Silkscreen', 'Bottom_Silkscreen'),
('F.Mask', 'Top_Soldermask'),
('B.Mask', 'Bottom_Soldermask'),
('F.Paste', 'Top_Paste'),
('B.Paste', 'Bottom_Paste'),
('Edge.Cuts', 'Board_Outline'),
('F.Fab', 'Top_Fabrication'),
('B.Fab', 'Bottom_Fabrication'),
]
temp_pdf_dir = os.path.join(temp_dir, 'temp_pdfs')
os.makedirs(temp_pdf_dir, exist_ok=True)
pdf_files = []
for layer_name, file_suffix in layers:
pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf')
# Include Edge.Cuts on every layer except the Edge.Cuts layer itself
if layer_name == 'Edge.Cuts':
layers_to_export = layer_name
else:
layers_to_export = f"{layer_name},Edge.Cuts"
cmd = [
kicad_cli, 'pcb', 'export', 'pdf',
board_file,
'-l', layers_to_export,
'--include-border-title',
'-o', pdf_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
pdf_files.append(pdf_path)
else:
print(f"Warning: Failed to generate {layer_name}: {result.stderr}")
# Merge all PDFs into one
if pdf_files:
emit('pdf_status', {'status': 'Merging board layer PDFs...'})
merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf')
merger = PdfMerger()
for pdf in pdf_files:
merger.append(pdf)
merger.write(merged_pdf_path)
merger.close()
# Delete temp PDF directory
shutil.rmtree(temp_pdf_dir)
# Create ZIP file
emit('pdf_status', {'status': 'Creating ZIP archive...'})
zip_filename = f'{project_name}_PDFs.zip'
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, arcname)
# Clean up temp directory
shutil.rmtree(temp_dir)
emit('pdf_complete', {'path': zip_path, 'filename': zip_filename})
except Exception as e:
emit('pdf_error', {'error': str(e)})
@socketio.on('generate_gerbers')
def handle_generate_gerbers():
try:
kicad_cli = app_args.get('Kicad Cli', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli or not board_file:
emit('gerber_error', {'error': 'Missing kicad-cli or board-file arguments'})
return
# Create temporary directory for gerbers
temp_dir = tempfile.mkdtemp()
gerber_dir = os.path.join(temp_dir, 'gerbers')
os.makedirs(gerber_dir, exist_ok=True)
# Generate gerbers
emit('gerber_status', {'status': 'Generating gerber files...'})
cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
shutil.rmtree(temp_dir)
emit('gerber_error', {'error': f'Gerber generation failed: {result.stderr}'})
return
# Generate drill files
emit('gerber_status', {'status': 'Generating drill files...'})
cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Warning: Drill file generation failed: {result.stderr}")
# Generate ODB++ files
emit('gerber_status', {'status': 'Generating ODB++ files...'})
odb_dir = os.path.join(temp_dir, 'odb')
os.makedirs(odb_dir, exist_ok=True)
odb_file = os.path.join(odb_dir, f'{project_name}.zip')
cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Warning: ODB++ generation failed: {result.stderr}")
# Create ZIP file
emit('gerber_status', {'status': 'Creating ZIP archive...'})
zip_filename = f'{project_name}_fab.zip'
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add gerbers folder
for root, dirs, files in os.walk(gerber_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join('gerbers', os.path.basename(file_path))
zipf.write(file_path, arcname)
# Add odb folder
for root, dirs, files in os.walk(odb_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join('odb', os.path.relpath(file_path, odb_dir))
zipf.write(file_path, arcname)
# Clean up temp directory
shutil.rmtree(temp_dir)
emit('gerber_complete', {'path': zip_path, 'filename': zip_filename})
except Exception as e:
emit('gerber_error', {'error': str(e)})
@socketio.on('export_step')
def handle_export_step():
try:
kicad_cli = app_args.get('Kicad Cli', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli or not board_file:
emit('step_error', {'error': 'Missing kicad-cli or board-file arguments'})
return
# Create output filename
step_filename = f'{project_name}.step'
step_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), step_filename)
# Generate STEP file
emit('step_status', {'status': 'Exporting PCB to STEP format...'})
cmd = [kicad_cli, 'pcb', 'export', 'step', board_file, '-o', step_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('step_error', {'error': f'STEP export failed: {result.stderr}'})
return
if not os.path.exists(step_path):
emit('step_error', {'error': 'STEP file was not created'})
return
emit('step_complete', {'path': step_path, 'filename': step_filename})
except Exception as e:
emit('step_error', {'error': str(e)})
@socketio.on('render_pcb')
def handle_render_pcb():
try:
kicad_cli = app_args.get('Kicad Cli', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli or not board_file:
emit('render_error', {'error': 'Missing kicad-cli or board-file arguments'})
return
# Create output filename
render_filename = f'{project_name}_iso_view.png'
render_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), render_filename)
# Render PCB with isometric view
emit('render_status', {'status': 'Rendering PCB image...'})
cmd = [
kicad_cli, 'pcb', 'render', board_file,
'--rotate', '25,0,45',
'-o', render_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
error_msg = f'PCB render failed:\nCommand: {" ".join(cmd)}\nStderr: {result.stderr}\nStdout: {result.stdout}'
emit('render_error', {'error': error_msg})
return
if not os.path.exists(render_path):
emit('render_error', {'error': 'Rendered image was not created'})
return
emit('render_complete', {'path': render_path, 'filename': render_filename})
except Exception as e:
emit('render_error', {'error': str(e)})
@socketio.on('sync_libraries')
def handle_sync_libraries():
try:
emit('sync_status', {'status': 'Starting library synchronization...'})
# Check if UM_KICAD is set
um_kicad = os.environ.get('UM_KICAD')
if not um_kicad:
emit('sync_error', {'error': 'UM_KICAD environment variable is not set in the Flask app environment'})
return
emit('sync_status', {'status': f'UM_KICAD is set to: {um_kicad}'})
# Run the add_libraries.py script
script_path = os.path.join(os.path.dirname(__file__), 'add_libraries.py')
if not os.path.exists(script_path):
emit('sync_error', {'error': 'add_libraries.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('sync_complete', {'output': output})
else:
emit('sync_error', {'error': f'Sync failed:\n{output}'})
except Exception as e:
emit('sync_error', {'error': str(e)})
@socketio.on('sync_database')
def handle_sync_database():
try:
emit('db_sync_status', {'status': 'Starting database synchronization...'})
# Get the parts spreadsheet path from config
parts_spreadsheet = app_config.get('parts_spreadsheet_path', '')
if not parts_spreadsheet:
emit('db_sync_error', {'error': 'Parts spreadsheet path not configured. Please set it in Settings.'})
return
if not os.path.exists(parts_spreadsheet):
emit('db_sync_error', {'error': f'Parts spreadsheet not found at: {parts_spreadsheet}'})
return
emit('db_sync_status', {'status': f'Using parts spreadsheet: {parts_spreadsheet}'})
# Run the gen_resistors_db.py script
script_path = os.path.join(os.path.dirname(__file__), 'gen_resistors_db.py')
if not os.path.exists(script_path):
emit('db_sync_error', {'error': 'gen_resistors_db.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path, parts_spreadsheet],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('db_sync_complete', {'output': output})
else:
emit('db_sync_error', {'error': f'Database sync failed:\n{output}'})
except Exception as e:
emit('db_sync_error', {'error': str(e)})
@socketio.on('init_user')
def handle_init_user():
try:
emit('init_status', {'status': 'Starting user environment initialization...'})
# Check if UM_KICAD is set
um_kicad = os.environ.get('UM_KICAD')
if not um_kicad:
emit('init_error', {'error': 'UM_KICAD environment variable is not set'})
return
emit('init_status', {'status': f'UM_KICAD: {um_kicad}'})
# Run the init_user.py script
script_path = os.path.join(os.path.dirname(__file__), 'init_user.py')
if not os.path.exists(script_path):
emit('init_error', {'error': 'init_user.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('init_complete', {'output': output})
else:
emit('init_error', {'error': f'Initialization failed:\n{output}'})
except Exception as e:
emit('init_error', {'error': str(e)})
@app.route('/download/<path:filename>')
def download_file(filename):
project_dir = app_args.get('Project Dir', '')
file_path = os.path.join(project_dir, filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "File not found", 404
@app.route('/config', methods=['GET', 'POST'])
def config():
if request.method == 'POST':
data = request.get_json()
app_config['parts_spreadsheet_path'] = data.get('parts_spreadsheet_path', '')
save_config()
return jsonify({'status': 'success', 'config': app_config})
else:
return jsonify(app_config)
@app.route('/variants')
def variants_page():
return render_template('variants.html')
# ---------------------------------------------------------------------------
# Variant Management Socket Handlers
# ---------------------------------------------------------------------------
def get_variant_manager():
"""Get VariantManager instance for current project"""
schematic_file = app_args.get('Schematic File', '')
if not schematic_file or not os.path.exists(schematic_file):
return None
return VariantManager(schematic_file)
def get_all_schematic_files(root_schematic):
"""Get all schematic files in a hierarchical design"""
from pathlib import Path
root_path = Path(root_schematic)
if not root_path.exists():
return [root_schematic]
schematic_files = [str(root_path)]
schematic_dir = root_path.parent
try:
with open(root_path, 'r', encoding='utf-8') as f:
content = f.read()
for line in content.split('\n'):
if '(property "Sheetfile"' in line:
parts = line.split('"')
if len(parts) >= 4:
sheet_file = parts[3]
sheet_path = schematic_dir / sheet_file
if sheet_path.exists():
sub_sheets = get_all_schematic_files(str(sheet_path))
for sub in sub_sheets:
if sub not in schematic_files:
schematic_files.append(sub)
except:
pass
return schematic_files
def get_all_parts_from_schematic():
"""Get all component references, values, and UUIDs from all schematics (including hierarchical sheets)"""
schematic_file = app_args.get('Schematic File', '')
if not schematic_file or not os.path.exists(schematic_file):
return []
# Get all schematic files
all_schematics = get_all_schematic_files(schematic_file)
all_parts = {} # uuid -> {reference, value}
for sch_file in all_schematics:
try:
with open(sch_file, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
in_symbol = False
current_uuid = None
current_ref = None
current_value = None
current_lib_id = None
for line in lines:
stripped = line.strip()
# Detect start of symbol
if stripped.startswith('(symbol'):
in_symbol = True
current_uuid = None
current_ref = None
current_value = None
current_lib_id = None
# Detect end of symbol
elif in_symbol and stripped == ')':
# Save the part if we have all the info, excluding power symbols
is_power = current_lib_id and 'power:' in current_lib_id
is_power = is_power or (current_ref and current_ref.startswith('#'))
if current_uuid and current_ref and not is_power and len(current_ref) > 1:
all_parts[current_uuid] = {
'reference': current_ref,
'value': current_value or ''
}
in_symbol = False
# Extract lib_id to check for power symbols
elif in_symbol and '(lib_id' in stripped:
lib_parts = line.split('"')
if len(lib_parts) >= 2:
current_lib_id = lib_parts[1]
# Extract UUID
elif in_symbol and '(uuid' in stripped:
uuid_parts = line.split('"')
if len(uuid_parts) >= 2:
current_uuid = uuid_parts[1]
# Extract reference - format: (property "Reference" "U1" ...
elif in_symbol and '(property "Reference"' in line:
try:
start = line.find('"Reference"') + len('"Reference"')
remainder = line[start:]
quote_start = remainder.find('"')
if quote_start != -1:
quote_end = remainder.find('"', quote_start + 1)
if quote_end != -1:
current_ref = remainder[quote_start + 1:quote_end]
except:
pass
# Extract value - format: (property "Value" "LM358" ...
elif in_symbol and '(property "Value"' in line:
try:
start = line.find('"Value"') + len('"Value"')
remainder = line[start:]
quote_start = remainder.find('"')
if quote_start != -1:
quote_end = remainder.find('"', quote_start + 1)
if quote_end != -1:
current_value = remainder[quote_start + 1:quote_end]
except:
pass
except Exception as e:
print(f"Error reading schematic {sch_file}: {e}")
return [{'uuid': uuid, 'reference': data['reference'], 'value': data['value']}
for uuid, data in sorted(all_parts.items(), key=lambda x: x[1]['reference'])]
@socketio.on('get_variants')
def handle_get_variants():
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
all_parts = get_all_parts_from_schematic()
emit('variants_data', {
'project_name': manager.project_name,
'variants': manager.get_variants(),
'active_variant': manager.get_active_variant(),
'all_parts': all_parts # Now includes uuid, reference, and value
})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('create_variant')
def handle_create_variant(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
description = data.get('description', '')
based_on = data.get('based_on', None)
if not name:
emit('variant_error', {'error': 'Variant name required'})
return
success = manager.create_variant(name, description, based_on)
if success:
emit('variant_updated', {'message': f'Variant "{name}" created'})
else:
emit('variant_error', {'error': f'Variant "{name}" already exists'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('delete_variant')
def handle_delete_variant(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
success = manager.delete_variant(name)
if success:
emit('variant_updated', {'message': f'Variant "{name}" deleted'})
else:
emit('variant_error', {'error': f'Cannot delete variant "{name}"'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('activate_variant')
def handle_activate_variant(data):
try:
import pygetwindow as gw
import pyautogui
import time
import win32gui
import win32con
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
schematic_file = app_args.get('Schematic File', '')
kicad_cli = app_args.get('Kicad Cli', 'kicad-cli')
# Step 1: Save and close schematic window
emit('variant_status', {'status': 'Looking for KiCad schematic window...'})
all_windows = gw.getAllTitles()
windows = gw.getWindowsWithTitle('Schematic Editor')
window_found = False
if not windows:
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
if schematic_windows:
windows = gw.getWindowsWithTitle(schematic_windows[0])
window_found = len(windows) > 0
else:
window_found = True
# If window is found, save and close it
if window_found:
window = windows[0]
emit('variant_status', {'status': f'Saving and closing: {window.title}'})
hwnd = window._hWnd
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
width = x2 - x
# Click to activate
click_x = x + width // 2
click_y = y + 10
pyautogui.click(click_x, click_y)
time.sleep(0.5)
# Save
pyautogui.hotkey('ctrl', 's')
time.sleep(1.0)
# Close
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
time.sleep(1.0)
# Try Alt+F4 if still open
if win32gui.IsWindow(hwnd):
try:
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
click_x = x + (x2 - x) // 2
click_y = y + 10
pyautogui.click(click_x, click_y)
time.sleep(0.3)
except:
pass
pyautogui.hotkey('alt', 'F4')
time.sleep(1.0)
# Step 2: Sync current variant from schematic
current_variant = manager.get_active_variant()
emit('variant_status', {'status': f'Syncing current variant "{current_variant}"...'})
from sync_variant import sync_variant_from_schematic
try:
sync_success = sync_variant_from_schematic(schematic_file, current_variant)
if sync_success:
manager = get_variant_manager()
else:
print(f"Warning: Sync of variant '{current_variant}' failed")
except Exception as e:
print(f"Error during sync: {e}")
# Step 3: Activate new variant and apply to schematic
emit('variant_status', {'status': f'Activating variant "{name}"...'})
success = manager.set_active_variant(name)
if success:
apply_script_path = os.path.join(os.path.dirname(__file__), 'apply_variant.py')
result = subprocess.run(
[sys.executable, apply_script_path, schematic_file, name, kicad_cli],
capture_output=True,
text=True
)
if result.returncode != 0:
error_msg = result.stderr if result.stderr else result.stdout
emit('variant_error', {'error': f'Failed to apply variant: {error_msg}'})
return
else:
emit('variant_error', {'error': f'Variant "{name}" not found'})
return
# Step 4: Reopen schematic editor
emit('variant_status', {'status': 'Waiting 2 seconds before reopening...'})
time.sleep(2.0)
emit('variant_status', {'status': 'Reopening schematic editor...'})
kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin"
if not os.path.exists(kicad_bin_dir):
kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin"
eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe")
if os.path.exists(eeschema_exe):
subprocess.Popen([eeschema_exe, schematic_file], shell=False)
time.sleep(2.0)
else:
emit('variant_status', {'status': f'Warning: eeschema.exe not found at {eeschema_exe}'})
emit('variant_updated', {'message': f'Activated variant "{name}" and reopened schematic'})
except ImportError as e:
emit('variant_error', {'error': f'Missing library: {str(e)}. Install: pip install pygetwindow pyautogui pywin32'})
except Exception as e:
import traceback
emit('variant_error', {'error': f'{str(e)}\n{traceback.format_exc()}'})
@socketio.on('get_variant_parts')
def handle_get_variant_parts(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
variant_name = data.get('variant', '')
all_parts = get_all_parts_from_schematic()
dnp_uuids = manager.get_dnp_parts(variant_name)
parts_data = []
for part in all_parts:
parts_data.append({
'uuid': part['uuid'],
'reference': part['reference'],
'value': part['value'],
'is_dnp': part['uuid'] in dnp_uuids
})
emit('variant_parts_data', {'parts': parts_data})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('set_part_dnp')
def handle_set_part_dnp(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
variant = data.get('variant', '')
uuid = data.get('uuid', '')
is_dnp = data.get('is_dnp', False)
success = manager.set_part_dnp(variant, uuid, is_dnp)
if success:
# Re-send updated parts list
handle_get_variant_parts({'variant': variant})
else:
emit('variant_error', {'error': 'Failed to update part'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('sync_from_schematic')
def handle_sync_from_schematic():
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
# Read DNP state from schematic and update active variant
schematic_file = app_args.get('Schematic File', '')
script_path = os.path.join(os.path.dirname(__file__), 'sync_variant.py')
result = subprocess.run(
[sys.executable, script_path, schematic_file],
capture_output=True,
text=True
)
if result.returncode == 0:
emit('sync_complete', {'message': f'Synced from schematic:\n{result.stdout}'})
else:
emit('variant_error', {'error': f'Failed to sync: {result.stderr}'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('test_window_interaction')
def handle_test_window_interaction():
try:
import pygetwindow as gw
import pyautogui
import time
import win32gui
import win32con
emit('window_test_status', {'status': 'Looking for KiCad schematic window...'})
# List all windows for debugging
all_windows = gw.getAllTitles()
emit('window_test_status', {'status': f'DEBUG: Found {len(all_windows)} windows total'})
# Find KiCad schematic editor window
windows = gw.getWindowsWithTitle('Schematic Editor')
emit('window_test_status', {'status': f'DEBUG: Found {len(windows)} windows with "Schematic Editor"'})
window_found = False
if not windows:
# Try alternative window title
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
emit('window_test_status', {'status': f'DEBUG: Found {len(schematic_windows)} windows with "kicad" and "schematic"'})
if schematic_windows:
emit('window_test_status', {'status': f'DEBUG: Using window: {schematic_windows[0]}'})
windows = gw.getWindowsWithTitle(schematic_windows[0])
window_found = len(windows) > 0
else:
window_found = True
# If window is found, close it
if window_found:
window = windows[0]
emit('window_test_status', {'status': f'Found window: "{window.title}"'})
# Get window position and size
hwnd = window._hWnd
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
width = x2 - x
height = y2 - y
emit('window_test_status', {'status': f'DEBUG: Window position=({x},{y}), size=({width}x{height})'})
# Click on the window's title bar to activate it (more reliable than SetForegroundWindow)
click_x = x + width // 2
click_y = y + 10 # Title bar is usually at the top
emit('window_test_status', {'status': f'Clicking window at ({click_x}, {click_y}) to activate...'})
pyautogui.click(click_x, click_y)
time.sleep(0.5)
emit('window_test_status', {'status': 'Sending Ctrl+S (save)...'})
pyautogui.hotkey('ctrl', 's')
time.sleep(1.0)
emit('window_test_status', {'status': 'Attempting to close window...'})
# Method 1: Try WM_CLOSE message
emit('window_test_status', {'status': 'DEBUG: Sending WM_CLOSE message...'})
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
time.sleep(1.0)
# Check if window still exists
if win32gui.IsWindow(hwnd):
emit('window_test_status', {'status': 'DEBUG: Window still exists after WM_CLOSE, trying to click and send Alt+F4...'})
# Click on window again to make sure it has focus
try:
rect = win32gui.GetWindowRect(hwnd)
x, y, x2, y2 = rect
click_x = x + (x2 - x) // 2
click_y = y + 10
pyautogui.click(click_x, click_y)
time.sleep(0.3)
except:
emit('window_test_status', {'status': 'DEBUG: Could not click window (may already be closed)'})
pyautogui.hotkey('alt', 'F4')
time.sleep(1.0)
# Final check
if win32gui.IsWindow(hwnd):
emit('window_test_status', {'status': 'DEBUG: Window still exists after Alt+F4 - may need manual intervention'})
else:
emit('window_test_status', {'status': 'DEBUG: Window closed successfully via Alt+F4'})
else:
emit('window_test_status', {'status': 'DEBUG: Window closed successfully via WM_CLOSE'})
else:
emit('window_test_status', {'status': 'No KiCad schematic window found - will open it'})
# Wait a couple seconds before reopening
emit('window_test_status', {'status': 'Waiting 2 seconds before reopening...'})
time.sleep(2.0)
# Reopen the schematic editor
schematic_file = app_args.get('Schematic File', '')
if not schematic_file:
emit('window_test_error', {'error': 'No schematic file specified in app arguments'})
return
emit('window_test_status', {'status': f'Relaunching schematic editor with: {schematic_file}'})
# Launch KiCad schematic editor
# The schematic editor executable is typically in the same directory as kicad.exe
import os
kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin" # Default KiCad 9 installation path
if not os.path.exists(kicad_bin_dir):
kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin" # Try KiCad 8
eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe")
if not os.path.exists(eeschema_exe):
emit('window_test_error', {'error': f'KiCad executable not found at: {eeschema_exe}'})
return
emit('window_test_status', {'status': f'DEBUG: Launching {eeschema_exe} {schematic_file}'})
# Launch KiCad with the schematic file
result = subprocess.Popen([eeschema_exe, schematic_file], shell=False)
emit('window_test_status', {'status': f'DEBUG: Process started with PID {result.pid}'})
time.sleep(2.0)
# Verify the window opened
all_windows = gw.getAllTitles()
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
if schematic_windows:
emit('window_test_status', {'status': f'Successfully reopened schematic: {schematic_windows[0]}'})
else:
emit('window_test_status', {'status': 'Schematic editor launched but window not detected yet'})
emit('window_test_complete', {'message': 'Window interaction test completed!'})
except ImportError as e:
emit('window_test_error', {'error': f'Missing required library: {str(e)}. Please install: pip install pygetwindow pyautogui pywin32'})
except Exception as e:
import traceback
emit('window_test_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
# ---------------------------------------------------------------------------
# Library Management
# ---------------------------------------------------------------------------
def get_db_connection():
"""Get ODBC connection to the parts database"""
try:
# Use DSN connection as configured in KiCad database library
conn_str = "DSN=UM_KiCad_Parts;"
conn = pyodbc.connect(conn_str)
return conn
except Exception as e:
print(f"Database connection error: {e}")
return None
@socketio.on('search_parts')
def handle_search_parts(data):
try:
search_query = data.get('query', '').strip()
conn = get_db_connection()
if not conn:
emit('library_error', {'error': 'Could not connect to database'})
return
cursor = conn.cursor()
# Build search query - search across ipn, mpn, manufacturer, description, symbol, and footprint
# Column names in SQLite are lowercase
if search_query:
sql = """
SELECT ipn, mpn, manufacturer, description, datasheet
FROM parts
WHERE ipn LIKE ?
OR mpn LIKE ?
OR manufacturer LIKE ?
OR description LIKE ?
OR symbol LIKE ?
OR footprint LIKE ?
ORDER BY ipn
LIMIT 100
"""
search_param = f'%{search_query}%'
cursor.execute(sql, (search_param, search_param, search_param, search_param, search_param, search_param))
else:
# No search query - return first 100 parts
sql = """
SELECT ipn, mpn, manufacturer, description, datasheet
FROM parts
ORDER BY ipn
LIMIT 100
"""
cursor.execute(sql)
rows = cursor.fetchall()
parts = []
for row in rows:
parts.append({
'ipn': row[0] if row[0] else '',
'mpn': row[1] if row[1] else '',
'manufacturer': row[2] if row[2] else '',
'description': row[3] if row[3] else '',
'datasheet': row[4] if row[4] else ''
})
cursor.close()
conn.close()
emit('library_search_results', {'parts': parts, 'count': len(parts)})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_missing_ipns')
def handle_get_missing_ipns():
try:
# Load config to get spreadsheet path
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
with open(config_path, 'r') as f:
config = json.load(f)
spreadsheet_path = config.get('parts_spreadsheet_path', '')
if not spreadsheet_path or not os.path.exists(spreadsheet_path):
emit('library_error', {'error': 'Parts spreadsheet not found'})
return
# Get all IPNs from database
conn = get_db_connection()
if not conn:
emit('library_error', {'error': 'Could not connect to database'})
return
cursor = conn.cursor()
cursor.execute("SELECT ipn FROM parts")
db_ipns = set(row.ipn for row in cursor.fetchall() if row.ipn)
cursor.close()
conn.close()
# Read spreadsheet
import openpyxl
wb = openpyxl.load_workbook(spreadsheet_path, read_only=True, data_only=True)
ws = wb.active
# Find header row and column indices
header_row = None
for row in ws.iter_rows(min_row=1, max_row=10, values_only=True):
if row and 'GLE P/N' in row:
header_row = row
break
if not header_row:
emit('library_error', {'error': 'Could not find header row in spreadsheet'})
return
ipn_col = header_row.index('GLE P/N')
desc_col = header_row.index('Description') if 'Description' in header_row else None
class_col = header_row.index('Class') if 'Class' in header_row else None
mfg_col = header_row.index('Mfg.1') if 'Mfg.1' in header_row else None
mpn_col = header_row.index('Mfg.1 P/N') if 'Mfg.1 P/N' in header_row else None
# Collect missing IPNs
missing_parts = []
for row in ws.iter_rows(min_row=2, values_only=True):
if not row or not row[ipn_col]:
continue
ipn = str(row[ipn_col]).strip()
if not ipn or ipn in db_ipns:
continue
# Get manufacturer and MPN
manufacturer = str(row[mfg_col]).strip() if mfg_col and row[mfg_col] else ''
mpn = str(row[mpn_col]).strip() if mpn_col and row[mpn_col] else ''
# Skip section headers - rows with IPN and description but no manufacturer or MPN
if not manufacturer and not mpn:
continue
# Get and normalize class field
class_value = str(row[class_col]).strip() if class_col and row[class_col] else ''
if class_value:
# Replace spaces and special characters with underscore, collapse multiple underscores
import re
class_value = re.sub(r'[^a-zA-Z0-9]+', '_', class_value.upper())
class_value = re.sub(r'_+', '_', class_value).strip('_')
part = {
'ipn': ipn,
'description': str(row[desc_col]).strip() if desc_col and row[desc_col] else '',
'class': class_value,
'manufacturer': manufacturer,
'mpn': mpn
}
missing_parts.append(part)
wb.close()
# Sort by IPN
missing_parts.sort(key=lambda x: x['ipn'])
emit('missing_ipns_result', {'parts': missing_parts, 'count': len(missing_parts)})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('create_part')
def handle_create_part(data):
try:
ipn = data.get('ipn', '').strip()
manufacturer = data.get('manufacturer', '').strip()
mpn = data.get('mpn', '').strip()
description = data.get('description', '').strip()
class_value = data.get('class', '').strip()
datasheet = data.get('datasheet', '').strip()
symbol = data.get('symbol', '').strip()
footprint = data.get('footprint', '').strip()
if not ipn:
emit('library_error', {'error': 'IPN is required'})
return
conn = get_db_connection()
if not conn:
emit('library_error', {'error': 'Could not connect to database'})
return
cursor = conn.cursor()
# Check if part already exists
cursor.execute("SELECT ipn FROM parts WHERE ipn = ?", (ipn,))
if cursor.fetchone():
emit('library_error', {'error': f'Part {ipn} already exists in database'})
cursor.close()
conn.close()
return
# Insert new part
cursor.execute("""
INSERT INTO parts (ipn, manufacturer, mpn, description, class, datasheet, symbol, footprint)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (ipn, manufacturer, mpn, description, class_value, datasheet, symbol, footprint))
conn.commit()
cursor.close()
conn.close()
emit('part_created', {'message': f'Successfully created part {ipn}'})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
def resolve_library_path(lib_ref, lib_type='symbol'):
"""Resolve library reference to full path using KiCad library tables.
Args:
lib_ref: Library reference in format LibraryName:ItemName
lib_type: 'symbol' or 'footprint'
Returns:
tuple: (library_path, item_name, library_name) or (None, None, None) if not found
"""
try:
if ':' not in lib_ref:
return (None, None, None)
lib_name, item_name = lib_ref.split(':', 1)
# Get KiCad config directory
kicad_config_dir = os.path.expanduser(os.path.join('~', 'AppData', 'Roaming', 'kicad', '9.0'))
if lib_type == 'symbol':
table_file = os.path.join(kicad_config_dir, 'sym-lib-table')
else:
table_file = os.path.join(kicad_config_dir, 'fp-lib-table')
if not os.path.exists(table_file):
return (None, None, None)
# Parse the library table file
with open(table_file, 'r', encoding='utf-8') as f:
content = f.read()
# Simple s-expression parsing to find the library entry
# Handle both single-line and multi-line formats
# Pattern matches (lib ... (name "X") ... (uri "Y") ... )
import re
# Use DOTALL flag to match across newlines
pattern = r'\(lib\s+.*?\(name\s+"([^"]+)"\).*?\(uri\s+"([^"]+)"\).*?\)'
matches = re.findall(pattern, content, re.DOTALL)
for match_name, match_uri in matches:
if match_name == lib_name:
# Resolve environment variables in the path
lib_path = match_uri
# Replace common KiCad environment variables
env_vars = {
'${KICAD9_SYMBOL_DIR}': os.environ.get('KICAD9_SYMBOL_DIR', ''),
'${KICAD9_FOOTPRINT_DIR}': os.environ.get('KICAD9_FOOTPRINT_DIR', ''),
'${UM_KICAD}': os.environ.get('UM_KICAD', '')
}
for var, val in env_vars.items():
if var in lib_path and val:
lib_path = lib_path.replace(var, val)
# Convert to absolute path with proper separators
lib_path = os.path.normpath(lib_path.replace('/', os.sep))
if os.path.exists(lib_path):
return (lib_path, item_name, lib_name)
return (None, None, None)
except Exception as e:
print(f"Error resolving library path: {e}")
return (None, None, None)
@socketio.on('get_part_details')
def handle_get_part_details(data):
try:
ipn = data.get('ipn', '').strip()
if not ipn:
emit('library_error', {'error': 'IPN is required'})
return
conn = get_db_connection()
if not conn:
emit('library_error', {'error': 'Could not connect to database'})
return
cursor = conn.cursor()
cursor.execute("""
SELECT ipn, manufacturer, mpn, description, class, datasheet, symbol, footprint
FROM parts WHERE ipn = ?
""", (ipn,))
row = cursor.fetchone()
cursor.close()
conn.close()
if not row:
emit('library_error', {'error': f'Part {ipn} not found'})
return
part = {
'ipn': row[0],
'manufacturer': row[1] or '',
'mpn': row[2] or '',
'description': row[3] or '',
'class': row[4] or '',
'datasheet': row[5] or '',
'symbol': row[6] or '',
'footprint': row[7] or ''
}
# Resolve symbol and footprint library paths
symbol_info = None
footprint_info = None
if part['symbol']:
symbol_path, symbol_name, symbol_lib = resolve_library_path(part['symbol'], 'symbol')
if symbol_path and symbol_name:
symbol_info = {
'library_path': symbol_path,
'symbol_name': symbol_name,
'library_name': symbol_lib
}
if part['footprint']:
footprint_path, footprint_name, footprint_lib = resolve_library_path(part['footprint'], 'footprint')
if footprint_path and footprint_name:
footprint_info = {
'library_path': footprint_path,
'footprint_name': footprint_name,
'library_name': footprint_lib
}
emit('part_details_result', {
'part': part,
'symbol_info': symbol_info,
'footprint_info': footprint_info
})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('update_part')
def handle_update_part(data):
try:
ipn = data.get('ipn', '').strip()
manufacturer = data.get('manufacturer', '').strip()
mpn = data.get('mpn', '').strip()
description = data.get('description', '').strip()
class_value = data.get('class', '').strip()
datasheet = data.get('datasheet', '').strip()
symbol = data.get('symbol', '').strip()
footprint = data.get('footprint', '').strip()
if not ipn:
emit('library_error', {'error': 'IPN is required'})
return
conn = get_db_connection()
if not conn:
emit('library_error', {'error': 'Could not connect to database'})
return
cursor = conn.cursor()
# Check if part exists
cursor.execute("SELECT ipn FROM parts WHERE ipn = ?", (ipn,))
if not cursor.fetchone():
emit('library_error', {'error': f'Part {ipn} not found in database'})
cursor.close()
conn.close()
return
# Update part
cursor.execute("""
UPDATE parts
SET manufacturer = ?, mpn = ?, description = ?, class = ?, datasheet = ?, symbol = ?, footprint = ?
WHERE ipn = ?
""", (manufacturer, mpn, description, class_value, datasheet, symbol, footprint, ipn))
conn.commit()
cursor.close()
conn.close()
emit('part_updated', {'message': f'Successfully updated part {ipn}'})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_symbol_libraries')
def handle_get_symbol_libraries():
try:
libraries = get_symbol_libraries()
emit('symbol_libraries_result', {'libraries': libraries})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_symbols_in_library')
def handle_get_symbols_in_library(data):
try:
library_path = data.get('library_path', '')
if not library_path or not os.path.exists(library_path):
emit('library_error', {'error': 'Invalid library path'})
return
symbols = parse_kicad_symbol_file(library_path)
emit('symbols_in_library_result', {'symbols': symbols})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('render_symbol')
def handle_render_symbol(data):
try:
library_path = data.get('library_path', '')
symbol_name = data.get('symbol_name', '')
if not library_path or not symbol_name:
emit('library_error', {'error': 'Missing library path or symbol name'})
return
if not os.path.exists(library_path):
emit('library_error', {'error': 'Library file not found'})
return
symbol_block = extract_symbol_graphics(library_path, symbol_name)
svg = render_symbol_to_svg(symbol_block)
if svg:
emit('symbol_render_result', {'svg': svg, 'symbol_name': symbol_name})
else:
emit('library_error', {'error': f'Could not render symbol {symbol_name}'})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_footprint_libraries')
def handle_get_footprint_libraries():
try:
libraries = get_footprint_libraries()
emit('footprint_libraries_result', {'libraries': libraries})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_footprints_in_library')
def handle_get_footprints_in_library(data):
try:
library_path = data.get('library_path', '')
if not library_path or not os.path.exists(library_path):
emit('library_error', {'error': 'Invalid library path'})
return
footprints = get_footprints_in_library(library_path)
emit('footprints_in_library_result', {'footprints': footprints})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('render_footprint')
def handle_render_footprint(data):
try:
library_path = data.get('library_path', '')
footprint_name = data.get('footprint_name', '')
if not library_path or not footprint_name:
emit('library_error', {'error': 'Missing library path or footprint name'})
return
if not os.path.exists(library_path):
emit('library_error', {'error': 'Library directory not found'})
return
footprint_file = os.path.join(library_path, f'{footprint_name}.kicad_mod')
if not os.path.exists(footprint_file):
emit('library_error', {'error': f'Footprint file not found: {footprint_name}.kicad_mod'})
return
svg = render_footprint_to_svg(footprint_file)
if svg:
emit('footprint_render_result', {'svg': svg, 'footprint_name': footprint_name})
else:
emit('library_error', {'error': f'Could not render footprint {footprint_name}'})
except Exception as e:
import traceback
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('get_footprint_model')
def handle_get_footprint_model(data):
"""Extract and return embedded 3D model from a footprint."""
try:
library_path = data.get('library_path', '')
footprint_name = data.get('footprint_name', '')
if not library_path or not footprint_name:
emit('model_result', {'has_model': False})
return
if not os.path.exists(library_path):
emit('model_result', {'has_model': False})
return
# Extract embedded model
model_data = extract_embedded_model(library_path, footprint_name)
if model_data:
# Decode base64 to get the actual file data
import base64
import zstandard as zstd
try:
# Decode base64
compressed_data = base64.b64decode(model_data['data'])
# Decompress (KiCad uses zstd compression)
dctx = zstd.ZstdDecompressor()
decompressed_data = dctx.decompress(compressed_data)
# Re-encode as base64 for transmission
model_base64 = base64.b64encode(decompressed_data).decode('ascii')
emit('model_result', {
'has_model': True,
'name': model_data['name'],
'type': model_data['type'],
'data': model_base64,
'format': 'step' # Assuming STEP format
})
except Exception as e:
print(f"Error decoding model data: {e}")
import traceback
traceback.print_exc()
emit('model_result', {'has_model': False})
else:
emit('model_result', {'has_model': False})
except Exception as e:
import traceback
print(f"Error extracting model: {traceback.format_exc()}")
emit('model_result', {'has_model': False})
# ---------------------------------------------------------------------------
# BOM Generation
# ---------------------------------------------------------------------------
@socketio.on('generate_bom')
def handle_generate_bom():
try:
manager = get_variant_manager()
if not manager:
emit('bom_error', {'error': 'No project loaded'})
return
schematic_file = app_args.get('Schematic File', '')
board_file = app_args.get('Board File', '')
project_name = app_args.get('Project Name', 'project')
project_dir = app_args.get('Project Dir', '')
if not schematic_file:
emit('bom_error', {'error': 'No schematic file specified'})
return
# Get active variant and DNP parts
active_variant = manager.get_active_variant()
dnp_uuids = manager.get_dnp_parts(active_variant)
emit('bom_status', {'status': 'Generating BOMs...'})
# Call BOM generator script
bom_script_path = os.path.join(os.path.dirname(__file__), 'bom_generator.py')
# Build command with optional PCB file
cmd = [sys.executable, bom_script_path, schematic_file, project_name, active_variant, json.dumps(dnp_uuids)]
if board_file:
cmd.append(board_file)
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
if result.returncode == 0:
# Create ZIP of all BOMs
output_dir = project_dir if project_dir else os.path.dirname(schematic_file)
base_name = f"{project_name}_{active_variant}"
zip_filename = f"{base_name}_BOMs.zip"
zip_path = os.path.join(output_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w') as zipf:
# Add all BOM files
bom_files = [
f"{base_name}_BOM.xlsx",
f"{base_name}_Not_Populated.csv",
f"{base_name}_BOM_Top.xlsx",
f"{base_name}_BOM_Bottom.xlsx"
]
for bom_file in bom_files:
full_path = os.path.join(output_dir, bom_file)
if os.path.exists(full_path):
zipf.write(full_path, bom_file)
emit('bom_complete', {'path': zip_path, 'filename': zip_filename})
else:
emit('bom_error', {'error': f'BOM generation failed: {result.stderr}'})
except Exception as e:
import traceback
emit('bom_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@socketio.on('build_all')
def handle_build_all():
"""Generate all manufacturing outputs: PDFs, Gerbers, Drill, ODB++, STEP, BOMs"""
try:
kicad_cli = app_args.get('Kicad Cli', '')
sch_file = app_args.get('Schematic File', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
variant = app_args.get('Variant', 'default')
if not kicad_cli or not sch_file or not board_file:
emit('build_error', {'error': 'Schematic or board file not configured'})
return
if not os.path.exists(sch_file):
emit('build_error', {'error': f'Schematic file not found: {sch_file}'})
return
if not os.path.exists(board_file):
emit('build_error', {'error': f'Board file not found: {board_file}'})
return
# Create output directory
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = os.path.join(project_dir, f'manufacturing_{timestamp}')
os.makedirs(output_dir, exist_ok=True)
# ===== STEP 1: Generate Schematic PDF =====
emit('build_progress', {'percent': 5, 'status': 'Generating schematic PDF...', 'message': 'Starting schematic PDF generation'})
schematics_dir = os.path.join(output_dir, 'schematics')
os.makedirs(schematics_dir, exist_ok=True)
sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf')
cmd = [kicad_cli, 'sch', 'export', 'pdf', sch_file, '-o', sch_pdf_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_error', {'error': f'Schematic PDF generation failed: {result.stderr}'})
return
# ===== STEP 2: Generate Board Layer PDFs =====
emit('build_progress', {'percent': 10, 'status': 'Generating board layer PDFs...', 'message': 'Schematic PDF complete'})
board_dir = os.path.join(output_dir, 'board')
os.makedirs(board_dir, exist_ok=True)
temp_pdf_dir = os.path.join(output_dir, 'temp_pdfs')
os.makedirs(temp_pdf_dir, exist_ok=True)
# All layers to export
layers = [
('F.Cu', 'Top_Copper'),
('B.Cu', 'Bottom_Copper'),
('F.Silkscreen', 'Top_Silkscreen'),
('B.Silkscreen', 'Bottom_Silkscreen'),
('F.Mask', 'Top_Soldermask'),
('B.Mask', 'Bottom_Soldermask'),
('F.Paste', 'Top_Paste'),
('B.Paste', 'Bottom_Paste'),
('Edge.Cuts', 'Board_Outline'),
('F.Fab', 'Top_Fabrication'),
('B.Fab', 'Bottom_Fabrication'),
]
pdf_files = []
for layer_name, file_suffix in layers:
pdf_path_temp = os.path.join(temp_pdf_dir, f'{file_suffix}_temp.pdf')
pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf')
# Include Edge.Cuts on every layer except the Edge.Cuts layer itself
if layer_name == 'Edge.Cuts':
layers_to_export = layer_name
else:
layers_to_export = f"{layer_name},Edge.Cuts"
cmd = [
kicad_cli, 'pcb', 'export', 'pdf',
board_file,
'-l', layers_to_export,
'--include-border-title',
'-o', pdf_path_temp
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
# Add layer name text overlay
layer_text = f"Layer: {file_suffix.replace('_', ' ')}"
add_text_overlay_to_pdf(pdf_path_temp, pdf_path, layer_text)
pdf_files.append(pdf_path)
# Merge all PDFs into one
if pdf_files:
emit('build_progress', {'percent': 25, 'status': 'Merging board layer PDFs...', 'message': f'Generated {len(pdf_files)} layer PDFs'})
merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf')
merger = PdfMerger()
for pdf in pdf_files:
merger.append(pdf)
merger.write(merged_pdf_path)
merger.close()
# Delete temp PDF directory
shutil.rmtree(temp_pdf_dir)
# ===== STEP 3: Generate Gerbers =====
emit('build_progress', {'percent': 35, 'status': 'Generating Gerbers...', 'message': 'Board PDFs complete, starting Gerber generation'})
gerber_dir = os.path.join(output_dir, 'gerbers')
os.makedirs(gerber_dir, exist_ok=True)
cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_error', {'error': f'Gerber generation failed: {result.stderr}'})
return
# ===== STEP 4: Generate Drill Files =====
emit('build_progress', {'percent': 50, 'status': 'Generating drill files...', 'message': 'Gerbers complete'})
cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_progress', {'percent': 55, 'status': 'Drill file warning', 'message': f'Drill generation had issues: {result.stderr}'})
# ===== STEP 5: Generate ODB++ =====
emit('build_progress', {'percent': 60, 'status': 'Generating ODB++...', 'message': 'Drill files complete'})
odb_dir = os.path.join(output_dir, 'odb')
os.makedirs(odb_dir, exist_ok=True)
odb_file = os.path.join(odb_dir, f'{project_name}.zip')
cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_progress', {'percent': 65, 'status': 'ODB++ generation skipped', 'message': 'ODB++ not available, continuing...'})
else:
emit('build_progress', {'percent': 65, 'status': 'ODB++ complete', 'message': 'ODB++ generation successful'})
# ===== STEP 6: Export STEP =====
emit('build_progress', {'percent': 70, 'status': 'Exporting STEP model...', 'message': 'Starting 3D model export'})
step_file = os.path.join(output_dir, f'{project_name}.step')
cmd = [kicad_cli, 'pcb', 'export', 'step', board_file, '-o', step_file, '--subst-models']
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_progress', {'percent': 75, 'status': 'STEP export failed', 'message': f'STEP export error: {result.stderr}'})
else:
emit('build_progress', {'percent': 75, 'status': 'STEP export complete', 'message': 'STEP model generated'})
# ===== STEP 7: Generate BOMs =====
emit('build_progress', {'percent': 80, 'status': 'Generating BOMs...', 'message': 'Starting BOM generation'})
bom_script = os.path.join(os.path.dirname(__file__), 'bom_generator.py')
# BOM generator expects: schematic_file, project_name, variant_name, [dnp_uuids_json], [pcb_file]
cmd = [sys.executable, bom_script, sch_file, project_name, variant, '[]']
if board_file:
cmd.append(board_file)
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
emit('build_progress', {'percent': 90, 'status': 'BOM generation failed', 'message': f'BOM error: {result.stderr}'})
else:
# Move BOM files from schematic directory to output directory
bom_base_name = f'{project_name}_{variant}'
bom_files = [
f'{bom_base_name}_BOM.xlsx',
f'{bom_base_name}_Not_Populated.csv',
f'{bom_base_name}_BOM_Top.xlsx',
f'{bom_base_name}_BOM_Bottom.xlsx'
]
for bom_file in bom_files:
src_path = os.path.join(project_dir, bom_file)
if os.path.exists(src_path):
dst_path = os.path.join(output_dir, bom_file)
shutil.move(src_path, dst_path)
emit('build_progress', {'percent': 90, 'status': 'BOMs complete', 'message': 'BOM files generated'})
# ===== STEP 8: Create ZIP Package =====
emit('build_progress', {'percent': 95, 'status': 'Packaging outputs...', 'message': 'Creating ZIP archive'})
zip_filename = f'{project_name}_manufacturing_{timestamp}.zip'
zip_path = os.path.join(project_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add all files from output directory
for root, dirs, files in os.walk(output_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, output_dir)
zipf.write(file_path, arcname)
emit('build_progress', {'percent': 100, 'status': 'Build complete!', 'message': f'Package ready: {zip_filename}'})
emit('build_complete', {'download_url': f'/download_build/{zip_filename}', 'filename': zip_filename})
except Exception as e:
import traceback
emit('build_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
@app.route('/download_build/<filename>')
def download_build(filename):
"""Serve the manufacturing output ZIP file"""
sch_file = app_args.get('Schematic File', '')
if not sch_file:
return "Configuration error", 500
project_dir = os.path.dirname(sch_file)
file_path = os.path.join(project_dir, filename)
if not os.path.exists(file_path):
return "File not found", 404
return send_file(file_path, as_attachment=True, download_name=filename)
def shutdown_server():
print("Server stopped")
os._exit(0)
def parse_args(args):
"""Parse command line arguments into a dictionary"""
parsed = {'executable': args[0] if args else ''}
i = 1
while i < len(args):
if args[i].startswith('--'):
key = args[i][2:].replace('-', ' ').title()
if i + 1 < len(args) and not args[i + 1].startswith('--'):
parsed[key] = args[i + 1]
i += 2
else:
parsed[key] = 'true'
i += 1
else:
i += 1
return parsed
if __name__ == '__main__':
# Load configuration
load_config()
# Parse arguments
app_args = parse_args(sys.argv)
# Open browser after short delay
def open_browser():
time.sleep(1.5)
webbrowser.open('http://127.0.0.1:5000')
threading.Thread(target=open_browser, daemon=True).start()
# Run the app
print("Starting Flask app...")
socketio.run(app, debug=False, host='127.0.0.1', port=5000)