- Add parts library browser with ODBC database search - Implement hierarchical BOM generation with PCB side detection - Add STEP export and PCB rendering functionality - Adjust page width to 80% up to 1536px max - Add library-only mode for standalone library browsing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1203 lines
43 KiB
Python
1203 lines
43 KiB
Python
import sys
|
|
import webbrowser
|
|
import threading
|
|
import subprocess
|
|
import os
|
|
import zipfile
|
|
import tempfile
|
|
import shutil
|
|
import json
|
|
from pathlib import Path
|
|
from flask import Flask, render_template, request, send_file, jsonify
|
|
from flask_socketio import SocketIO, emit
|
|
import time
|
|
from PyPDF2 import PdfMerger
|
|
from variant_manager import VariantManager
|
|
import pyodbc
|
|
|
|
app = Flask(__name__)
|
|
app.config['SECRET_KEY'] = 'secret!'
|
|
socketio = SocketIO(app, cors_allowed_origins="*")
|
|
|
|
# Store arguments
|
|
app_args = {}
|
|
connected_clients = set()
|
|
heartbeat_timeout = 5 # seconds
|
|
|
|
# Configuration
|
|
config_file = 'config.json'
|
|
app_config = {}
|
|
|
|
def load_config():
|
|
"""Load configuration from file"""
|
|
global app_config
|
|
if os.path.exists(config_file):
|
|
with open(config_file, 'r') as f:
|
|
app_config = json.load(f)
|
|
else:
|
|
app_config = {
|
|
'parts_spreadsheet_path': ''
|
|
}
|
|
return app_config
|
|
|
|
def save_config():
|
|
"""Save configuration to file"""
|
|
with open(config_file, 'w') as f:
|
|
json.dump(app_config, f, indent=2)
|
|
|
|
@app.route('/')
|
|
def index():
|
|
# Reconstruct the command line that invoked this app
|
|
cmd_parts = [sys.argv[0]]
|
|
for i in range(1, len(sys.argv)):
|
|
arg = sys.argv[i]
|
|
# Quote arguments with spaces
|
|
if ' ' in arg:
|
|
cmd_parts.append(f'"{arg}"')
|
|
else:
|
|
cmd_parts.append(arg)
|
|
invocation_cmd = ' '.join(cmd_parts)
|
|
|
|
# Check if called with no arguments (library-only mode)
|
|
library_only_mode = len(sys.argv) == 1 or not app_args
|
|
|
|
return render_template('index.html', args=app_args, invocation_cmd=invocation_cmd, library_only_mode=library_only_mode)
|
|
|
|
@socketio.on('connect')
|
|
def handle_connect():
|
|
connected_clients.add(request.sid)
|
|
print(f"Client connected: {request.sid}")
|
|
|
|
@socketio.on('disconnect')
|
|
def handle_disconnect():
|
|
connected_clients.discard(request.sid)
|
|
print(f"Client disconnected: {request.sid}")
|
|
|
|
# Shutdown if no clients connected
|
|
if not connected_clients:
|
|
print("No clients connected. Shutting down...")
|
|
threading.Timer(1.0, shutdown_server).start()
|
|
|
|
@socketio.on('heartbeat')
|
|
def handle_heartbeat():
|
|
emit('heartbeat_ack')
|
|
|
|
@socketio.on('generate_pdf')
|
|
def handle_generate_pdf():
|
|
try:
|
|
kicad_cli = app_args.get('Kicad Cli', '')
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
board_file = app_args.get('Board File', '')
|
|
project_dir = app_args.get('Project Dir', '')
|
|
project_name = app_args.get('Project Name', 'project')
|
|
|
|
if not kicad_cli:
|
|
emit('pdf_error', {'error': 'Missing kicad-cli argument'})
|
|
return
|
|
|
|
# Create temporary directory for PDFs
|
|
temp_dir = tempfile.mkdtemp()
|
|
schematics_dir = os.path.join(temp_dir, 'schematics')
|
|
board_dir = os.path.join(temp_dir, 'board')
|
|
os.makedirs(schematics_dir, exist_ok=True)
|
|
os.makedirs(board_dir, exist_ok=True)
|
|
|
|
# Generate schematic PDF
|
|
if schematic_file:
|
|
emit('pdf_status', {'status': 'Generating schematic PDF...'})
|
|
sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf')
|
|
cmd = [kicad_cli, 'sch', 'export', 'pdf', schematic_file, '-o', sch_pdf_path]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode != 0:
|
|
shutil.rmtree(temp_dir)
|
|
emit('pdf_error', {'error': f'Schematic PDF failed: {result.stderr}'})
|
|
return
|
|
|
|
# Generate board layer PDFs - one per layer, then merge
|
|
if board_file:
|
|
emit('pdf_status', {'status': 'Generating board layer PDFs...'})
|
|
|
|
# All layers to export
|
|
layers = [
|
|
('F.Cu', 'Top_Copper'),
|
|
('B.Cu', 'Bottom_Copper'),
|
|
('F.Silkscreen', 'Top_Silkscreen'),
|
|
('B.Silkscreen', 'Bottom_Silkscreen'),
|
|
('F.Mask', 'Top_Soldermask'),
|
|
('B.Mask', 'Bottom_Soldermask'),
|
|
('F.Paste', 'Top_Paste'),
|
|
('B.Paste', 'Bottom_Paste'),
|
|
('Edge.Cuts', 'Board_Outline'),
|
|
('F.Fab', 'Top_Fabrication'),
|
|
('B.Fab', 'Bottom_Fabrication'),
|
|
]
|
|
|
|
temp_pdf_dir = os.path.join(temp_dir, 'temp_pdfs')
|
|
os.makedirs(temp_pdf_dir, exist_ok=True)
|
|
pdf_files = []
|
|
|
|
for layer_name, file_suffix in layers:
|
|
pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf')
|
|
|
|
# Include Edge.Cuts on every layer except the Edge.Cuts layer itself
|
|
if layer_name == 'Edge.Cuts':
|
|
layers_to_export = layer_name
|
|
else:
|
|
layers_to_export = f"{layer_name},Edge.Cuts"
|
|
|
|
cmd = [
|
|
kicad_cli, 'pcb', 'export', 'pdf',
|
|
board_file,
|
|
'-l', layers_to_export,
|
|
'--include-border-title',
|
|
'-o', pdf_path
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
pdf_files.append(pdf_path)
|
|
else:
|
|
print(f"Warning: Failed to generate {layer_name}: {result.stderr}")
|
|
|
|
# Merge all PDFs into one
|
|
if pdf_files:
|
|
emit('pdf_status', {'status': 'Merging board layer PDFs...'})
|
|
merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf')
|
|
merger = PdfMerger()
|
|
|
|
for pdf in pdf_files:
|
|
merger.append(pdf)
|
|
|
|
merger.write(merged_pdf_path)
|
|
merger.close()
|
|
|
|
# Delete temp PDF directory
|
|
shutil.rmtree(temp_pdf_dir)
|
|
|
|
# Create ZIP file
|
|
emit('pdf_status', {'status': 'Creating ZIP archive...'})
|
|
zip_filename = f'{project_name}_PDFs.zip'
|
|
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
for root, dirs, files in os.walk(temp_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.relpath(file_path, temp_dir)
|
|
zipf.write(file_path, arcname)
|
|
|
|
# Clean up temp directory
|
|
shutil.rmtree(temp_dir)
|
|
|
|
emit('pdf_complete', {'path': zip_path, 'filename': zip_filename})
|
|
|
|
except Exception as e:
|
|
emit('pdf_error', {'error': str(e)})
|
|
|
|
@socketio.on('generate_gerbers')
|
|
def handle_generate_gerbers():
|
|
try:
|
|
kicad_cli = app_args.get('Kicad Cli', '')
|
|
board_file = app_args.get('Board File', '')
|
|
project_dir = app_args.get('Project Dir', '')
|
|
project_name = app_args.get('Project Name', 'project')
|
|
|
|
if not kicad_cli or not board_file:
|
|
emit('gerber_error', {'error': 'Missing kicad-cli or board-file arguments'})
|
|
return
|
|
|
|
# Create temporary directory for gerbers
|
|
temp_dir = tempfile.mkdtemp()
|
|
gerber_dir = os.path.join(temp_dir, 'gerbers')
|
|
os.makedirs(gerber_dir, exist_ok=True)
|
|
|
|
# Generate gerbers
|
|
emit('gerber_status', {'status': 'Generating gerber files...'})
|
|
cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
shutil.rmtree(temp_dir)
|
|
emit('gerber_error', {'error': f'Gerber generation failed: {result.stderr}'})
|
|
return
|
|
|
|
# Generate drill files
|
|
emit('gerber_status', {'status': 'Generating drill files...'})
|
|
cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Warning: Drill file generation failed: {result.stderr}")
|
|
|
|
# Generate ODB++ files
|
|
emit('gerber_status', {'status': 'Generating ODB++ files...'})
|
|
odb_dir = os.path.join(temp_dir, 'odb')
|
|
os.makedirs(odb_dir, exist_ok=True)
|
|
odb_file = os.path.join(odb_dir, f'{project_name}.zip')
|
|
cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file]
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
print(f"Warning: ODB++ generation failed: {result.stderr}")
|
|
|
|
# Create ZIP file
|
|
emit('gerber_status', {'status': 'Creating ZIP archive...'})
|
|
zip_filename = f'{project_name}_fab.zip'
|
|
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
# Add gerbers folder
|
|
for root, dirs, files in os.walk(gerber_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.join('gerbers', os.path.basename(file_path))
|
|
zipf.write(file_path, arcname)
|
|
|
|
# Add odb folder
|
|
for root, dirs, files in os.walk(odb_dir):
|
|
for file in files:
|
|
file_path = os.path.join(root, file)
|
|
arcname = os.path.join('odb', os.path.relpath(file_path, odb_dir))
|
|
zipf.write(file_path, arcname)
|
|
|
|
# Clean up temp directory
|
|
shutil.rmtree(temp_dir)
|
|
|
|
emit('gerber_complete', {'path': zip_path, 'filename': zip_filename})
|
|
|
|
except Exception as e:
|
|
emit('gerber_error', {'error': str(e)})
|
|
|
|
@socketio.on('export_step')
|
|
def handle_export_step():
|
|
try:
|
|
kicad_cli = app_args.get('Kicad Cli', '')
|
|
board_file = app_args.get('Board File', '')
|
|
project_dir = app_args.get('Project Dir', '')
|
|
project_name = app_args.get('Project Name', 'project')
|
|
|
|
if not kicad_cli or not board_file:
|
|
emit('step_error', {'error': 'Missing kicad-cli or board-file arguments'})
|
|
return
|
|
|
|
# Create output filename
|
|
step_filename = f'{project_name}.step'
|
|
step_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), step_filename)
|
|
|
|
# Generate STEP file
|
|
emit('step_status', {'status': 'Exporting PCB to STEP format...'})
|
|
cmd = [kicad_cli, 'pcb', 'export', 'step', board_file, '-o', step_path]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
emit('step_error', {'error': f'STEP export failed: {result.stderr}'})
|
|
return
|
|
|
|
if not os.path.exists(step_path):
|
|
emit('step_error', {'error': 'STEP file was not created'})
|
|
return
|
|
|
|
emit('step_complete', {'path': step_path, 'filename': step_filename})
|
|
|
|
except Exception as e:
|
|
emit('step_error', {'error': str(e)})
|
|
|
|
@socketio.on('render_pcb')
|
|
def handle_render_pcb():
|
|
try:
|
|
kicad_cli = app_args.get('Kicad Cli', '')
|
|
board_file = app_args.get('Board File', '')
|
|
project_dir = app_args.get('Project Dir', '')
|
|
project_name = app_args.get('Project Name', 'project')
|
|
|
|
if not kicad_cli or not board_file:
|
|
emit('render_error', {'error': 'Missing kicad-cli or board-file arguments'})
|
|
return
|
|
|
|
# Create output filename
|
|
render_filename = f'{project_name}_iso_view.png'
|
|
render_path = os.path.join(project_dir if project_dir else os.path.dirname(board_file), render_filename)
|
|
|
|
# Render PCB with isometric view
|
|
emit('render_status', {'status': 'Rendering PCB image...'})
|
|
cmd = [
|
|
kicad_cli, 'pcb', 'render', board_file,
|
|
'--rotate', '25,0,45',
|
|
'-o', render_path
|
|
]
|
|
|
|
result = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = f'PCB render failed:\nCommand: {" ".join(cmd)}\nStderr: {result.stderr}\nStdout: {result.stdout}'
|
|
emit('render_error', {'error': error_msg})
|
|
return
|
|
|
|
if not os.path.exists(render_path):
|
|
emit('render_error', {'error': 'Rendered image was not created'})
|
|
return
|
|
|
|
emit('render_complete', {'path': render_path, 'filename': render_filename})
|
|
|
|
except Exception as e:
|
|
emit('render_error', {'error': str(e)})
|
|
|
|
@socketio.on('sync_libraries')
|
|
def handle_sync_libraries():
|
|
try:
|
|
emit('sync_status', {'status': 'Starting library synchronization...'})
|
|
|
|
# Check if UM_KICAD is set
|
|
um_kicad = os.environ.get('UM_KICAD')
|
|
if not um_kicad:
|
|
emit('sync_error', {'error': 'UM_KICAD environment variable is not set in the Flask app environment'})
|
|
return
|
|
|
|
emit('sync_status', {'status': f'UM_KICAD is set to: {um_kicad}'})
|
|
|
|
# Run the add_libraries.py script
|
|
script_path = os.path.join(os.path.dirname(__file__), 'add_libraries.py')
|
|
|
|
if not os.path.exists(script_path):
|
|
emit('sync_error', {'error': 'add_libraries.py script not found'})
|
|
return
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, script_path],
|
|
capture_output=True,
|
|
text=True,
|
|
env=os.environ.copy()
|
|
)
|
|
|
|
output = result.stdout + result.stderr
|
|
|
|
if result.returncode == 0:
|
|
emit('sync_complete', {'output': output})
|
|
else:
|
|
emit('sync_error', {'error': f'Sync failed:\n{output}'})
|
|
|
|
except Exception as e:
|
|
emit('sync_error', {'error': str(e)})
|
|
|
|
@socketio.on('sync_database')
|
|
def handle_sync_database():
|
|
try:
|
|
emit('db_sync_status', {'status': 'Starting database synchronization...'})
|
|
|
|
# Get the parts spreadsheet path from config
|
|
parts_spreadsheet = app_config.get('parts_spreadsheet_path', '')
|
|
if not parts_spreadsheet:
|
|
emit('db_sync_error', {'error': 'Parts spreadsheet path not configured. Please set it in Settings.'})
|
|
return
|
|
|
|
if not os.path.exists(parts_spreadsheet):
|
|
emit('db_sync_error', {'error': f'Parts spreadsheet not found at: {parts_spreadsheet}'})
|
|
return
|
|
|
|
emit('db_sync_status', {'status': f'Using parts spreadsheet: {parts_spreadsheet}'})
|
|
|
|
# Run the gen_resistors_db.py script
|
|
script_path = os.path.join(os.path.dirname(__file__), 'gen_resistors_db.py')
|
|
|
|
if not os.path.exists(script_path):
|
|
emit('db_sync_error', {'error': 'gen_resistors_db.py script not found'})
|
|
return
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, script_path, parts_spreadsheet],
|
|
capture_output=True,
|
|
text=True,
|
|
env=os.environ.copy()
|
|
)
|
|
|
|
output = result.stdout + result.stderr
|
|
|
|
if result.returncode == 0:
|
|
emit('db_sync_complete', {'output': output})
|
|
else:
|
|
emit('db_sync_error', {'error': f'Database sync failed:\n{output}'})
|
|
|
|
except Exception as e:
|
|
emit('db_sync_error', {'error': str(e)})
|
|
|
|
@socketio.on('init_user')
|
|
def handle_init_user():
|
|
try:
|
|
emit('init_status', {'status': 'Starting user environment initialization...'})
|
|
|
|
# Check if UM_KICAD is set
|
|
um_kicad = os.environ.get('UM_KICAD')
|
|
if not um_kicad:
|
|
emit('init_error', {'error': 'UM_KICAD environment variable is not set'})
|
|
return
|
|
|
|
emit('init_status', {'status': f'UM_KICAD: {um_kicad}'})
|
|
|
|
# Run the init_user.py script
|
|
script_path = os.path.join(os.path.dirname(__file__), 'init_user.py')
|
|
|
|
if not os.path.exists(script_path):
|
|
emit('init_error', {'error': 'init_user.py script not found'})
|
|
return
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, script_path],
|
|
capture_output=True,
|
|
text=True,
|
|
env=os.environ.copy()
|
|
)
|
|
|
|
output = result.stdout + result.stderr
|
|
|
|
if result.returncode == 0:
|
|
emit('init_complete', {'output': output})
|
|
else:
|
|
emit('init_error', {'error': f'Initialization failed:\n{output}'})
|
|
|
|
except Exception as e:
|
|
emit('init_error', {'error': str(e)})
|
|
|
|
@app.route('/download/<path:filename>')
|
|
def download_file(filename):
|
|
project_dir = app_args.get('Project Dir', '')
|
|
file_path = os.path.join(project_dir, filename)
|
|
if os.path.exists(file_path):
|
|
return send_file(file_path, as_attachment=True)
|
|
return "File not found", 404
|
|
|
|
@app.route('/config', methods=['GET', 'POST'])
|
|
def config():
|
|
if request.method == 'POST':
|
|
data = request.get_json()
|
|
app_config['parts_spreadsheet_path'] = data.get('parts_spreadsheet_path', '')
|
|
save_config()
|
|
return jsonify({'status': 'success', 'config': app_config})
|
|
else:
|
|
return jsonify(app_config)
|
|
|
|
@app.route('/variants')
|
|
def variants_page():
|
|
return render_template('variants.html')
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Variant Management Socket Handlers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_variant_manager():
|
|
"""Get VariantManager instance for current project"""
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
if not schematic_file or not os.path.exists(schematic_file):
|
|
return None
|
|
return VariantManager(schematic_file)
|
|
|
|
def get_all_schematic_files(root_schematic):
|
|
"""Get all schematic files in a hierarchical design"""
|
|
from pathlib import Path
|
|
|
|
root_path = Path(root_schematic)
|
|
if not root_path.exists():
|
|
return [root_schematic]
|
|
|
|
schematic_files = [str(root_path)]
|
|
schematic_dir = root_path.parent
|
|
|
|
try:
|
|
with open(root_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
for line in content.split('\n'):
|
|
if '(property "Sheetfile"' in line:
|
|
parts = line.split('"')
|
|
if len(parts) >= 4:
|
|
sheet_file = parts[3]
|
|
sheet_path = schematic_dir / sheet_file
|
|
if sheet_path.exists():
|
|
sub_sheets = get_all_schematic_files(str(sheet_path))
|
|
for sub in sub_sheets:
|
|
if sub not in schematic_files:
|
|
schematic_files.append(sub)
|
|
except:
|
|
pass
|
|
|
|
return schematic_files
|
|
|
|
|
|
def get_all_parts_from_schematic():
|
|
"""Get all component references, values, and UUIDs from all schematics (including hierarchical sheets)"""
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
if not schematic_file or not os.path.exists(schematic_file):
|
|
return []
|
|
|
|
# Get all schematic files
|
|
all_schematics = get_all_schematic_files(schematic_file)
|
|
all_parts = {} # uuid -> {reference, value}
|
|
|
|
for sch_file in all_schematics:
|
|
try:
|
|
with open(sch_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
lines = content.split('\n')
|
|
in_symbol = False
|
|
current_uuid = None
|
|
current_ref = None
|
|
current_value = None
|
|
current_lib_id = None
|
|
|
|
for line in lines:
|
|
stripped = line.strip()
|
|
|
|
# Detect start of symbol
|
|
if stripped.startswith('(symbol'):
|
|
in_symbol = True
|
|
current_uuid = None
|
|
current_ref = None
|
|
current_value = None
|
|
current_lib_id = None
|
|
|
|
# Detect end of symbol
|
|
elif in_symbol and stripped == ')':
|
|
# Save the part if we have all the info, excluding power symbols
|
|
is_power = current_lib_id and 'power:' in current_lib_id
|
|
is_power = is_power or (current_ref and current_ref.startswith('#'))
|
|
|
|
if current_uuid and current_ref and not is_power and len(current_ref) > 1:
|
|
all_parts[current_uuid] = {
|
|
'reference': current_ref,
|
|
'value': current_value or ''
|
|
}
|
|
in_symbol = False
|
|
|
|
# Extract lib_id to check for power symbols
|
|
elif in_symbol and '(lib_id' in stripped:
|
|
lib_parts = line.split('"')
|
|
if len(lib_parts) >= 2:
|
|
current_lib_id = lib_parts[1]
|
|
|
|
# Extract UUID
|
|
elif in_symbol and '(uuid' in stripped:
|
|
uuid_parts = line.split('"')
|
|
if len(uuid_parts) >= 2:
|
|
current_uuid = uuid_parts[1]
|
|
|
|
# Extract reference - format: (property "Reference" "U1" ...
|
|
elif in_symbol and '(property "Reference"' in line:
|
|
try:
|
|
start = line.find('"Reference"') + len('"Reference"')
|
|
remainder = line[start:]
|
|
quote_start = remainder.find('"')
|
|
if quote_start != -1:
|
|
quote_end = remainder.find('"', quote_start + 1)
|
|
if quote_end != -1:
|
|
current_ref = remainder[quote_start + 1:quote_end]
|
|
except:
|
|
pass
|
|
|
|
# Extract value - format: (property "Value" "LM358" ...
|
|
elif in_symbol and '(property "Value"' in line:
|
|
try:
|
|
start = line.find('"Value"') + len('"Value"')
|
|
remainder = line[start:]
|
|
quote_start = remainder.find('"')
|
|
if quote_start != -1:
|
|
quote_end = remainder.find('"', quote_start + 1)
|
|
if quote_end != -1:
|
|
current_value = remainder[quote_start + 1:quote_end]
|
|
except:
|
|
pass
|
|
|
|
except Exception as e:
|
|
print(f"Error reading schematic {sch_file}: {e}")
|
|
|
|
return [{'uuid': uuid, 'reference': data['reference'], 'value': data['value']}
|
|
for uuid, data in sorted(all_parts.items(), key=lambda x: x[1]['reference'])]
|
|
|
|
@socketio.on('get_variants')
|
|
def handle_get_variants():
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
all_parts = get_all_parts_from_schematic()
|
|
|
|
emit('variants_data', {
|
|
'project_name': manager.project_name,
|
|
'variants': manager.get_variants(),
|
|
'active_variant': manager.get_active_variant(),
|
|
'all_parts': all_parts # Now includes uuid, reference, and value
|
|
})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('create_variant')
|
|
def handle_create_variant(data):
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
name = data.get('name', '')
|
|
description = data.get('description', '')
|
|
based_on = data.get('based_on', None)
|
|
|
|
if not name:
|
|
emit('variant_error', {'error': 'Variant name required'})
|
|
return
|
|
|
|
success = manager.create_variant(name, description, based_on)
|
|
if success:
|
|
emit('variant_updated', {'message': f'Variant "{name}" created'})
|
|
else:
|
|
emit('variant_error', {'error': f'Variant "{name}" already exists'})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('delete_variant')
|
|
def handle_delete_variant(data):
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
name = data.get('name', '')
|
|
success = manager.delete_variant(name)
|
|
|
|
if success:
|
|
emit('variant_updated', {'message': f'Variant "{name}" deleted'})
|
|
else:
|
|
emit('variant_error', {'error': f'Cannot delete variant "{name}"'})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('activate_variant')
|
|
def handle_activate_variant(data):
|
|
try:
|
|
import pygetwindow as gw
|
|
import pyautogui
|
|
import time
|
|
import win32gui
|
|
import win32con
|
|
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
name = data.get('name', '')
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
kicad_cli = app_args.get('Kicad Cli', 'kicad-cli')
|
|
|
|
# Step 1: Save and close schematic window
|
|
emit('variant_status', {'status': 'Looking for KiCad schematic window...'})
|
|
|
|
all_windows = gw.getAllTitles()
|
|
windows = gw.getWindowsWithTitle('Schematic Editor')
|
|
|
|
window_found = False
|
|
if not windows:
|
|
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
|
|
if schematic_windows:
|
|
windows = gw.getWindowsWithTitle(schematic_windows[0])
|
|
window_found = len(windows) > 0
|
|
else:
|
|
window_found = True
|
|
|
|
# If window is found, save and close it
|
|
if window_found:
|
|
window = windows[0]
|
|
emit('variant_status', {'status': f'Saving and closing: {window.title}'})
|
|
|
|
hwnd = window._hWnd
|
|
rect = win32gui.GetWindowRect(hwnd)
|
|
x, y, x2, y2 = rect
|
|
width = x2 - x
|
|
|
|
# Click to activate
|
|
click_x = x + width // 2
|
|
click_y = y + 10
|
|
pyautogui.click(click_x, click_y)
|
|
time.sleep(0.5)
|
|
|
|
# Save
|
|
pyautogui.hotkey('ctrl', 's')
|
|
time.sleep(1.0)
|
|
|
|
# Close
|
|
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
|
|
time.sleep(1.0)
|
|
|
|
# Try Alt+F4 if still open
|
|
if win32gui.IsWindow(hwnd):
|
|
try:
|
|
rect = win32gui.GetWindowRect(hwnd)
|
|
x, y, x2, y2 = rect
|
|
click_x = x + (x2 - x) // 2
|
|
click_y = y + 10
|
|
pyautogui.click(click_x, click_y)
|
|
time.sleep(0.3)
|
|
except:
|
|
pass
|
|
pyautogui.hotkey('alt', 'F4')
|
|
time.sleep(1.0)
|
|
|
|
# Step 2: Sync current variant from schematic
|
|
current_variant = manager.get_active_variant()
|
|
emit('variant_status', {'status': f'Syncing current variant "{current_variant}"...'})
|
|
|
|
from sync_variant import sync_variant_from_schematic
|
|
try:
|
|
sync_success = sync_variant_from_schematic(schematic_file, current_variant)
|
|
if sync_success:
|
|
manager = get_variant_manager()
|
|
else:
|
|
print(f"Warning: Sync of variant '{current_variant}' failed")
|
|
except Exception as e:
|
|
print(f"Error during sync: {e}")
|
|
|
|
# Step 3: Activate new variant and apply to schematic
|
|
emit('variant_status', {'status': f'Activating variant "{name}"...'})
|
|
success = manager.set_active_variant(name)
|
|
|
|
if success:
|
|
apply_script_path = os.path.join(os.path.dirname(__file__), 'apply_variant.py')
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, apply_script_path, schematic_file, name, kicad_cli],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
error_msg = result.stderr if result.stderr else result.stdout
|
|
emit('variant_error', {'error': f'Failed to apply variant: {error_msg}'})
|
|
return
|
|
else:
|
|
emit('variant_error', {'error': f'Variant "{name}" not found'})
|
|
return
|
|
|
|
# Step 4: Reopen schematic editor
|
|
emit('variant_status', {'status': 'Waiting 2 seconds before reopening...'})
|
|
time.sleep(2.0)
|
|
|
|
emit('variant_status', {'status': 'Reopening schematic editor...'})
|
|
|
|
kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin"
|
|
if not os.path.exists(kicad_bin_dir):
|
|
kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin"
|
|
|
|
eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe")
|
|
|
|
if os.path.exists(eeschema_exe):
|
|
subprocess.Popen([eeschema_exe, schematic_file], shell=False)
|
|
time.sleep(2.0)
|
|
else:
|
|
emit('variant_status', {'status': f'Warning: eeschema.exe not found at {eeschema_exe}'})
|
|
|
|
emit('variant_updated', {'message': f'Activated variant "{name}" and reopened schematic'})
|
|
|
|
except ImportError as e:
|
|
emit('variant_error', {'error': f'Missing library: {str(e)}. Install: pip install pygetwindow pyautogui pywin32'})
|
|
except Exception as e:
|
|
import traceback
|
|
emit('variant_error', {'error': f'{str(e)}\n{traceback.format_exc()}'})
|
|
|
|
@socketio.on('get_variant_parts')
|
|
def handle_get_variant_parts(data):
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
variant_name = data.get('variant', '')
|
|
all_parts = get_all_parts_from_schematic()
|
|
dnp_uuids = manager.get_dnp_parts(variant_name)
|
|
|
|
parts_data = []
|
|
for part in all_parts:
|
|
parts_data.append({
|
|
'uuid': part['uuid'],
|
|
'reference': part['reference'],
|
|
'value': part['value'],
|
|
'is_dnp': part['uuid'] in dnp_uuids
|
|
})
|
|
|
|
emit('variant_parts_data', {'parts': parts_data})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('set_part_dnp')
|
|
def handle_set_part_dnp(data):
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
variant = data.get('variant', '')
|
|
uuid = data.get('uuid', '')
|
|
is_dnp = data.get('is_dnp', False)
|
|
|
|
success = manager.set_part_dnp(variant, uuid, is_dnp)
|
|
if success:
|
|
# Re-send updated parts list
|
|
handle_get_variant_parts({'variant': variant})
|
|
else:
|
|
emit('variant_error', {'error': 'Failed to update part'})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('sync_from_schematic')
|
|
def handle_sync_from_schematic():
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('variant_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
# Read DNP state from schematic and update active variant
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
script_path = os.path.join(os.path.dirname(__file__), 'sync_variant.py')
|
|
|
|
result = subprocess.run(
|
|
[sys.executable, script_path, schematic_file],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
emit('sync_complete', {'message': f'Synced from schematic:\n{result.stdout}'})
|
|
else:
|
|
emit('variant_error', {'error': f'Failed to sync: {result.stderr}'})
|
|
except Exception as e:
|
|
emit('variant_error', {'error': str(e)})
|
|
|
|
@socketio.on('test_window_interaction')
|
|
def handle_test_window_interaction():
|
|
try:
|
|
import pygetwindow as gw
|
|
import pyautogui
|
|
import time
|
|
import win32gui
|
|
import win32con
|
|
|
|
emit('window_test_status', {'status': 'Looking for KiCad schematic window...'})
|
|
|
|
# List all windows for debugging
|
|
all_windows = gw.getAllTitles()
|
|
emit('window_test_status', {'status': f'DEBUG: Found {len(all_windows)} windows total'})
|
|
|
|
# Find KiCad schematic editor window
|
|
windows = gw.getWindowsWithTitle('Schematic Editor')
|
|
emit('window_test_status', {'status': f'DEBUG: Found {len(windows)} windows with "Schematic Editor"'})
|
|
|
|
window_found = False
|
|
if not windows:
|
|
# Try alternative window title
|
|
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
|
|
emit('window_test_status', {'status': f'DEBUG: Found {len(schematic_windows)} windows with "kicad" and "schematic"'})
|
|
|
|
if schematic_windows:
|
|
emit('window_test_status', {'status': f'DEBUG: Using window: {schematic_windows[0]}'})
|
|
windows = gw.getWindowsWithTitle(schematic_windows[0])
|
|
window_found = len(windows) > 0
|
|
else:
|
|
window_found = True
|
|
|
|
# If window is found, close it
|
|
if window_found:
|
|
window = windows[0]
|
|
emit('window_test_status', {'status': f'Found window: "{window.title}"'})
|
|
|
|
# Get window position and size
|
|
hwnd = window._hWnd
|
|
rect = win32gui.GetWindowRect(hwnd)
|
|
x, y, x2, y2 = rect
|
|
width = x2 - x
|
|
height = y2 - y
|
|
emit('window_test_status', {'status': f'DEBUG: Window position=({x},{y}), size=({width}x{height})'})
|
|
|
|
# Click on the window's title bar to activate it (more reliable than SetForegroundWindow)
|
|
click_x = x + width // 2
|
|
click_y = y + 10 # Title bar is usually at the top
|
|
emit('window_test_status', {'status': f'Clicking window at ({click_x}, {click_y}) to activate...'})
|
|
pyautogui.click(click_x, click_y)
|
|
time.sleep(0.5)
|
|
|
|
emit('window_test_status', {'status': 'Sending Ctrl+S (save)...'})
|
|
pyautogui.hotkey('ctrl', 's')
|
|
time.sleep(1.0)
|
|
|
|
emit('window_test_status', {'status': 'Attempting to close window...'})
|
|
|
|
# Method 1: Try WM_CLOSE message
|
|
emit('window_test_status', {'status': 'DEBUG: Sending WM_CLOSE message...'})
|
|
win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0)
|
|
time.sleep(1.0)
|
|
|
|
# Check if window still exists
|
|
if win32gui.IsWindow(hwnd):
|
|
emit('window_test_status', {'status': 'DEBUG: Window still exists after WM_CLOSE, trying to click and send Alt+F4...'})
|
|
|
|
# Click on window again to make sure it has focus
|
|
try:
|
|
rect = win32gui.GetWindowRect(hwnd)
|
|
x, y, x2, y2 = rect
|
|
click_x = x + (x2 - x) // 2
|
|
click_y = y + 10
|
|
pyautogui.click(click_x, click_y)
|
|
time.sleep(0.3)
|
|
except:
|
|
emit('window_test_status', {'status': 'DEBUG: Could not click window (may already be closed)'})
|
|
|
|
pyautogui.hotkey('alt', 'F4')
|
|
time.sleep(1.0)
|
|
|
|
# Final check
|
|
if win32gui.IsWindow(hwnd):
|
|
emit('window_test_status', {'status': 'DEBUG: Window still exists after Alt+F4 - may need manual intervention'})
|
|
else:
|
|
emit('window_test_status', {'status': 'DEBUG: Window closed successfully via Alt+F4'})
|
|
else:
|
|
emit('window_test_status', {'status': 'DEBUG: Window closed successfully via WM_CLOSE'})
|
|
else:
|
|
emit('window_test_status', {'status': 'No KiCad schematic window found - will open it'})
|
|
|
|
# Wait a couple seconds before reopening
|
|
emit('window_test_status', {'status': 'Waiting 2 seconds before reopening...'})
|
|
time.sleep(2.0)
|
|
|
|
# Reopen the schematic editor
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
if not schematic_file:
|
|
emit('window_test_error', {'error': 'No schematic file specified in app arguments'})
|
|
return
|
|
|
|
emit('window_test_status', {'status': f'Relaunching schematic editor with: {schematic_file}'})
|
|
|
|
# Launch KiCad schematic editor
|
|
# The schematic editor executable is typically in the same directory as kicad.exe
|
|
import os
|
|
kicad_bin_dir = r"C:\Program Files\KiCad\9.0\bin" # Default KiCad 9 installation path
|
|
if not os.path.exists(kicad_bin_dir):
|
|
kicad_bin_dir = r"C:\Program Files\KiCad\8.0\bin" # Try KiCad 8
|
|
|
|
eeschema_exe = os.path.join(kicad_bin_dir, "eeschema.exe")
|
|
|
|
if not os.path.exists(eeschema_exe):
|
|
emit('window_test_error', {'error': f'KiCad executable not found at: {eeschema_exe}'})
|
|
return
|
|
|
|
emit('window_test_status', {'status': f'DEBUG: Launching {eeschema_exe} {schematic_file}'})
|
|
|
|
# Launch KiCad with the schematic file
|
|
result = subprocess.Popen([eeschema_exe, schematic_file], shell=False)
|
|
emit('window_test_status', {'status': f'DEBUG: Process started with PID {result.pid}'})
|
|
|
|
time.sleep(2.0)
|
|
|
|
# Verify the window opened
|
|
all_windows = gw.getAllTitles()
|
|
schematic_windows = [w for w in all_windows if 'kicad' in w.lower() and 'schematic' in w.lower()]
|
|
if schematic_windows:
|
|
emit('window_test_status', {'status': f'Successfully reopened schematic: {schematic_windows[0]}'})
|
|
else:
|
|
emit('window_test_status', {'status': 'Schematic editor launched but window not detected yet'})
|
|
|
|
emit('window_test_complete', {'message': 'Window interaction test completed!'})
|
|
except ImportError as e:
|
|
emit('window_test_error', {'error': f'Missing required library: {str(e)}. Please install: pip install pygetwindow pyautogui pywin32'})
|
|
except Exception as e:
|
|
import traceback
|
|
emit('window_test_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Library Management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_db_connection():
|
|
"""Get ODBC connection to the parts database"""
|
|
try:
|
|
# Use DSN connection as configured in KiCad database library
|
|
conn_str = "DSN=UM_KiCad_Parts;"
|
|
conn = pyodbc.connect(conn_str)
|
|
return conn
|
|
except Exception as e:
|
|
print(f"Database connection error: {e}")
|
|
return None
|
|
|
|
@socketio.on('search_parts')
|
|
def handle_search_parts(data):
|
|
try:
|
|
search_query = data.get('query', '').strip()
|
|
|
|
conn = get_db_connection()
|
|
if not conn:
|
|
emit('library_error', {'error': 'Could not connect to database'})
|
|
return
|
|
|
|
cursor = conn.cursor()
|
|
|
|
# Build search query - search across ipn, mpn, manufacturer, and description
|
|
# Column names in SQLite are lowercase
|
|
if search_query:
|
|
sql = """
|
|
SELECT ipn, mpn, manufacturer, description
|
|
FROM parts
|
|
WHERE ipn LIKE ?
|
|
OR mpn LIKE ?
|
|
OR manufacturer LIKE ?
|
|
OR description LIKE ?
|
|
ORDER BY ipn
|
|
LIMIT 100
|
|
"""
|
|
search_param = f'%{search_query}%'
|
|
cursor.execute(sql, (search_param, search_param, search_param, search_param))
|
|
else:
|
|
# No search query - return first 100 parts
|
|
sql = """
|
|
SELECT ipn, mpn, manufacturer, description
|
|
FROM parts
|
|
ORDER BY ipn
|
|
LIMIT 100
|
|
"""
|
|
cursor.execute(sql)
|
|
|
|
rows = cursor.fetchall()
|
|
|
|
parts = []
|
|
for row in rows:
|
|
parts.append({
|
|
'ipn': row.ipn if row.ipn else '',
|
|
'mpn': row.mpn if row.mpn else '',
|
|
'manufacturer': row.manufacturer if row.manufacturer else '',
|
|
'description': row.description if row.description else ''
|
|
})
|
|
|
|
cursor.close()
|
|
conn.close()
|
|
|
|
emit('library_search_results', {'parts': parts, 'count': len(parts)})
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
emit('library_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# BOM Generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@socketio.on('generate_bom')
|
|
def handle_generate_bom():
|
|
try:
|
|
manager = get_variant_manager()
|
|
if not manager:
|
|
emit('bom_error', {'error': 'No project loaded'})
|
|
return
|
|
|
|
schematic_file = app_args.get('Schematic File', '')
|
|
board_file = app_args.get('Board File', '')
|
|
project_name = app_args.get('Project Name', 'project')
|
|
project_dir = app_args.get('Project Dir', '')
|
|
|
|
if not schematic_file:
|
|
emit('bom_error', {'error': 'No schematic file specified'})
|
|
return
|
|
|
|
# Get active variant and DNP parts
|
|
active_variant = manager.get_active_variant()
|
|
dnp_uuids = manager.get_dnp_parts(active_variant)
|
|
|
|
emit('bom_status', {'status': 'Generating BOMs...'})
|
|
|
|
# Call BOM generator script
|
|
bom_script_path = os.path.join(os.path.dirname(__file__), 'bom_generator.py')
|
|
|
|
# Build command with optional PCB file
|
|
cmd = [sys.executable, bom_script_path, schematic_file, project_name, active_variant, json.dumps(dnp_uuids)]
|
|
if board_file:
|
|
cmd.append(board_file)
|
|
|
|
result = subprocess.run(
|
|
cmd,
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# Create ZIP of all BOMs
|
|
output_dir = project_dir if project_dir else os.path.dirname(schematic_file)
|
|
base_name = f"{project_name}_{active_variant}"
|
|
|
|
zip_filename = f"{base_name}_BOMs.zip"
|
|
zip_path = os.path.join(output_dir, zip_filename)
|
|
|
|
with zipfile.ZipFile(zip_path, 'w') as zipf:
|
|
# Add all BOM files
|
|
bom_files = [
|
|
f"{base_name}_BOM.xlsx",
|
|
f"{base_name}_Not_Populated.csv",
|
|
f"{base_name}_BOM_Top.xlsx",
|
|
f"{base_name}_BOM_Bottom.xlsx"
|
|
]
|
|
|
|
for bom_file in bom_files:
|
|
full_path = os.path.join(output_dir, bom_file)
|
|
if os.path.exists(full_path):
|
|
zipf.write(full_path, bom_file)
|
|
|
|
emit('bom_complete', {'path': zip_path, 'filename': zip_filename})
|
|
else:
|
|
emit('bom_error', {'error': f'BOM generation failed: {result.stderr}'})
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
emit('bom_error', {'error': f'{str(e)}\n\nTraceback:\n{traceback.format_exc()}'})
|
|
|
|
def shutdown_server():
|
|
print("Server stopped")
|
|
os._exit(0)
|
|
|
|
def parse_args(args):
|
|
"""Parse command line arguments into a dictionary"""
|
|
parsed = {'executable': args[0] if args else ''}
|
|
|
|
i = 1
|
|
while i < len(args):
|
|
if args[i].startswith('--'):
|
|
key = args[i][2:].replace('-', ' ').title()
|
|
if i + 1 < len(args) and not args[i + 1].startswith('--'):
|
|
parsed[key] = args[i + 1]
|
|
i += 2
|
|
else:
|
|
parsed[key] = 'true'
|
|
i += 1
|
|
else:
|
|
i += 1
|
|
|
|
return parsed
|
|
|
|
if __name__ == '__main__':
|
|
# Load configuration
|
|
load_config()
|
|
|
|
# Parse arguments
|
|
app_args = parse_args(sys.argv)
|
|
|
|
# Open browser after short delay
|
|
def open_browser():
|
|
time.sleep(1.5)
|
|
webbrowser.open('http://127.0.0.1:5000')
|
|
|
|
threading.Thread(target=open_browser, daemon=True).start()
|
|
|
|
# Run the app
|
|
print("Starting Flask app...")
|
|
socketio.run(app, debug=False, host='127.0.0.1', port=5000)
|