Files
kicad-manager/app.py
brentperteet 3f0aff923d initial commit
2026-02-22 08:16:48 -06:00

764 lines
27 KiB
Python

import sys
import webbrowser
import threading
import subprocess
import os
import zipfile
import tempfile
import shutil
import json
from pathlib import Path
from flask import Flask, render_template, request, send_file, jsonify
from flask_socketio import SocketIO, emit
import time
from PyPDF2 import PdfMerger
from variant_manager import VariantManager
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, cors_allowed_origins="*")
# Store arguments
app_args = {}
connected_clients = set()
heartbeat_timeout = 5 # seconds
# Configuration
config_file = 'config.json'
app_config = {}
def load_config():
"""Load configuration from file"""
global app_config
if os.path.exists(config_file):
with open(config_file, 'r') as f:
app_config = json.load(f)
else:
app_config = {
'parts_spreadsheet_path': ''
}
return app_config
def save_config():
"""Save configuration to file"""
with open(config_file, 'w') as f:
json.dump(app_config, f, indent=2)
@app.route('/')
def index():
# Reconstruct the command line that invoked this app
cmd_parts = [sys.argv[0]]
for i in range(1, len(sys.argv)):
arg = sys.argv[i]
# Quote arguments with spaces
if ' ' in arg:
cmd_parts.append(f'"{arg}"')
else:
cmd_parts.append(arg)
invocation_cmd = ' '.join(cmd_parts)
return render_template('index.html', args=app_args, invocation_cmd=invocation_cmd)
@socketio.on('connect')
def handle_connect():
connected_clients.add(request.sid)
print(f"Client connected: {request.sid}")
@socketio.on('disconnect')
def handle_disconnect():
connected_clients.discard(request.sid)
print(f"Client disconnected: {request.sid}")
# Shutdown if no clients connected
if not connected_clients:
print("No clients connected. Shutting down...")
threading.Timer(1.0, shutdown_server).start()
@socketio.on('heartbeat')
def handle_heartbeat():
emit('heartbeat_ack')
@socketio.on('generate_pdf')
def handle_generate_pdf():
try:
kicad_cli = app_args.get('Kicad Cli', '')
schematic_file = app_args.get('Schematic File', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli:
emit('pdf_error', {'error': 'Missing kicad-cli argument'})
return
# Create temporary directory for PDFs
temp_dir = tempfile.mkdtemp()
schematics_dir = os.path.join(temp_dir, 'schematics')
board_dir = os.path.join(temp_dir, 'board')
os.makedirs(schematics_dir, exist_ok=True)
os.makedirs(board_dir, exist_ok=True)
# Generate schematic PDF
if schematic_file:
emit('pdf_status', {'status': 'Generating schematic PDF...'})
sch_pdf_path = os.path.join(schematics_dir, f'{project_name}_schematic.pdf')
cmd = [kicad_cli, 'sch', 'export', 'pdf', schematic_file, '-o', sch_pdf_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
shutil.rmtree(temp_dir)
emit('pdf_error', {'error': f'Schematic PDF failed: {result.stderr}'})
return
# Generate board layer PDFs - one per layer, then merge
if board_file:
emit('pdf_status', {'status': 'Generating board layer PDFs...'})
# All layers to export
layers = [
('F.Cu', 'Top_Copper'),
('B.Cu', 'Bottom_Copper'),
('F.Silkscreen', 'Top_Silkscreen'),
('B.Silkscreen', 'Bottom_Silkscreen'),
('F.Mask', 'Top_Soldermask'),
('B.Mask', 'Bottom_Soldermask'),
('F.Paste', 'Top_Paste'),
('B.Paste', 'Bottom_Paste'),
('Edge.Cuts', 'Board_Outline'),
('F.Fab', 'Top_Fabrication'),
('B.Fab', 'Bottom_Fabrication'),
]
temp_pdf_dir = os.path.join(temp_dir, 'temp_pdfs')
os.makedirs(temp_pdf_dir, exist_ok=True)
pdf_files = []
for layer_name, file_suffix in layers:
pdf_path = os.path.join(temp_pdf_dir, f'{file_suffix}.pdf')
# Include Edge.Cuts on every layer except the Edge.Cuts layer itself
if layer_name == 'Edge.Cuts':
layers_to_export = layer_name
else:
layers_to_export = f"{layer_name},Edge.Cuts"
cmd = [
kicad_cli, 'pcb', 'export', 'pdf',
board_file,
'-l', layers_to_export,
'--include-border-title',
'-o', pdf_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
pdf_files.append(pdf_path)
else:
print(f"Warning: Failed to generate {layer_name}: {result.stderr}")
# Merge all PDFs into one
if pdf_files:
emit('pdf_status', {'status': 'Merging board layer PDFs...'})
merged_pdf_path = os.path.join(board_dir, f'{project_name}.pdf')
merger = PdfMerger()
for pdf in pdf_files:
merger.append(pdf)
merger.write(merged_pdf_path)
merger.close()
# Delete temp PDF directory
shutil.rmtree(temp_pdf_dir)
# Create ZIP file
emit('pdf_status', {'status': 'Creating ZIP archive...'})
zip_filename = f'{project_name}_PDFs.zip'
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, dirs, files in os.walk(temp_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, temp_dir)
zipf.write(file_path, arcname)
# Clean up temp directory
shutil.rmtree(temp_dir)
emit('pdf_complete', {'path': zip_path, 'filename': zip_filename})
except Exception as e:
emit('pdf_error', {'error': str(e)})
@socketio.on('generate_gerbers')
def handle_generate_gerbers():
try:
kicad_cli = app_args.get('Kicad Cli', '')
board_file = app_args.get('Board File', '')
project_dir = app_args.get('Project Dir', '')
project_name = app_args.get('Project Name', 'project')
if not kicad_cli or not board_file:
emit('gerber_error', {'error': 'Missing kicad-cli or board-file arguments'})
return
# Create temporary directory for gerbers
temp_dir = tempfile.mkdtemp()
gerber_dir = os.path.join(temp_dir, 'gerbers')
os.makedirs(gerber_dir, exist_ok=True)
# Generate gerbers
emit('gerber_status', {'status': 'Generating gerber files...'})
cmd = [kicad_cli, 'pcb', 'export', 'gerbers', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
shutil.rmtree(temp_dir)
emit('gerber_error', {'error': f'Gerber generation failed: {result.stderr}'})
return
# Generate drill files
emit('gerber_status', {'status': 'Generating drill files...'})
cmd = [kicad_cli, 'pcb', 'export', 'drill', board_file, '-o', gerber_dir]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Warning: Drill file generation failed: {result.stderr}")
# Generate ODB++ files
emit('gerber_status', {'status': 'Generating ODB++ files...'})
odb_dir = os.path.join(temp_dir, 'odb')
os.makedirs(odb_dir, exist_ok=True)
odb_file = os.path.join(odb_dir, f'{project_name}.zip')
cmd = [kicad_cli, 'pcb', 'export', 'odb', board_file, '-o', odb_file]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"Warning: ODB++ generation failed: {result.stderr}")
# Create ZIP file
emit('gerber_status', {'status': 'Creating ZIP archive...'})
zip_filename = f'{project_name}_fab.zip'
zip_path = os.path.join(project_dir if project_dir else temp_dir, zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# Add gerbers folder
for root, dirs, files in os.walk(gerber_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join('gerbers', os.path.basename(file_path))
zipf.write(file_path, arcname)
# Add odb folder
for root, dirs, files in os.walk(odb_dir):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.join('odb', os.path.relpath(file_path, odb_dir))
zipf.write(file_path, arcname)
# Clean up temp directory
shutil.rmtree(temp_dir)
emit('gerber_complete', {'path': zip_path, 'filename': zip_filename})
except Exception as e:
emit('gerber_error', {'error': str(e)})
@socketio.on('sync_libraries')
def handle_sync_libraries():
try:
emit('sync_status', {'status': 'Starting library synchronization...'})
# Check if UM_KICAD is set
um_kicad = os.environ.get('UM_KICAD')
if not um_kicad:
emit('sync_error', {'error': 'UM_KICAD environment variable is not set in the Flask app environment'})
return
emit('sync_status', {'status': f'UM_KICAD is set to: {um_kicad}'})
# Run the add_libraries.py script
script_path = os.path.join(os.path.dirname(__file__), 'add_libraries.py')
if not os.path.exists(script_path):
emit('sync_error', {'error': 'add_libraries.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('sync_complete', {'output': output})
else:
emit('sync_error', {'error': f'Sync failed:\n{output}'})
except Exception as e:
emit('sync_error', {'error': str(e)})
@socketio.on('sync_database')
def handle_sync_database():
try:
emit('db_sync_status', {'status': 'Starting database synchronization...'})
# Get the parts spreadsheet path from config
parts_spreadsheet = app_config.get('parts_spreadsheet_path', '')
if not parts_spreadsheet:
emit('db_sync_error', {'error': 'Parts spreadsheet path not configured. Please set it in Settings.'})
return
if not os.path.exists(parts_spreadsheet):
emit('db_sync_error', {'error': f'Parts spreadsheet not found at: {parts_spreadsheet}'})
return
emit('db_sync_status', {'status': f'Using parts spreadsheet: {parts_spreadsheet}'})
# Run the gen_resistors_db.py script
script_path = os.path.join(os.path.dirname(__file__), 'gen_resistors_db.py')
if not os.path.exists(script_path):
emit('db_sync_error', {'error': 'gen_resistors_db.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path, parts_spreadsheet],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('db_sync_complete', {'output': output})
else:
emit('db_sync_error', {'error': f'Database sync failed:\n{output}'})
except Exception as e:
emit('db_sync_error', {'error': str(e)})
@socketio.on('init_user')
def handle_init_user():
try:
emit('init_status', {'status': 'Starting user environment initialization...'})
# Check if UM_KICAD is set
um_kicad = os.environ.get('UM_KICAD')
if not um_kicad:
emit('init_error', {'error': 'UM_KICAD environment variable is not set'})
return
emit('init_status', {'status': f'UM_KICAD: {um_kicad}'})
# Run the init_user.py script
script_path = os.path.join(os.path.dirname(__file__), 'init_user.py')
if not os.path.exists(script_path):
emit('init_error', {'error': 'init_user.py script not found'})
return
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
env=os.environ.copy()
)
output = result.stdout + result.stderr
if result.returncode == 0:
emit('init_complete', {'output': output})
else:
emit('init_error', {'error': f'Initialization failed:\n{output}'})
except Exception as e:
emit('init_error', {'error': str(e)})
@app.route('/download/<path:filename>')
def download_file(filename):
project_dir = app_args.get('Project Dir', '')
file_path = os.path.join(project_dir, filename)
if os.path.exists(file_path):
return send_file(file_path, as_attachment=True)
return "File not found", 404
@app.route('/config', methods=['GET', 'POST'])
def config():
if request.method == 'POST':
data = request.get_json()
app_config['parts_spreadsheet_path'] = data.get('parts_spreadsheet_path', '')
save_config()
return jsonify({'status': 'success', 'config': app_config})
else:
return jsonify(app_config)
@app.route('/variants')
def variants_page():
return render_template('variants.html')
# ---------------------------------------------------------------------------
# Variant Management Socket Handlers
# ---------------------------------------------------------------------------
def get_variant_manager():
"""Get VariantManager instance for current project"""
schematic_file = app_args.get('Schematic File', '')
if not schematic_file or not os.path.exists(schematic_file):
return None
return VariantManager(schematic_file)
def get_all_schematic_files(root_schematic):
"""Get all schematic files in a hierarchical design"""
from pathlib import Path
root_path = Path(root_schematic)
if not root_path.exists():
return [root_schematic]
schematic_files = [str(root_path)]
schematic_dir = root_path.parent
try:
with open(root_path, 'r', encoding='utf-8') as f:
content = f.read()
for line in content.split('\n'):
if '(property "Sheetfile"' in line:
parts = line.split('"')
if len(parts) >= 4:
sheet_file = parts[3]
sheet_path = schematic_dir / sheet_file
if sheet_path.exists():
sub_sheets = get_all_schematic_files(str(sheet_path))
for sub in sub_sheets:
if sub not in schematic_files:
schematic_files.append(sub)
except:
pass
return schematic_files
def get_all_parts_from_schematic():
"""Get all component references, values, and UUIDs from all schematics (including hierarchical sheets)"""
schematic_file = app_args.get('Schematic File', '')
if not schematic_file or not os.path.exists(schematic_file):
return []
# Get all schematic files
all_schematics = get_all_schematic_files(schematic_file)
all_parts = {} # uuid -> {reference, value}
for sch_file in all_schematics:
try:
with open(sch_file, 'r', encoding='utf-8') as f:
content = f.read()
lines = content.split('\n')
in_symbol = False
current_uuid = None
current_ref = None
current_value = None
current_lib_id = None
for line in lines:
stripped = line.strip()
# Detect start of symbol
if stripped.startswith('(symbol'):
in_symbol = True
current_uuid = None
current_ref = None
current_value = None
current_lib_id = None
# Detect end of symbol
elif in_symbol and stripped == ')':
# Save the part if we have all the info, excluding power symbols
is_power = current_lib_id and 'power:' in current_lib_id
is_power = is_power or (current_ref and current_ref.startswith('#'))
if current_uuid and current_ref and not is_power and len(current_ref) > 1:
all_parts[current_uuid] = {
'reference': current_ref,
'value': current_value or ''
}
in_symbol = False
# Extract lib_id to check for power symbols
elif in_symbol and '(lib_id' in stripped:
lib_parts = line.split('"')
if len(lib_parts) >= 2:
current_lib_id = lib_parts[1]
# Extract UUID
elif in_symbol and '(uuid' in stripped:
uuid_parts = line.split('"')
if len(uuid_parts) >= 2:
current_uuid = uuid_parts[1]
# Extract reference - format: (property "Reference" "U1" ...
elif in_symbol and '(property "Reference"' in line:
try:
start = line.find('"Reference"') + len('"Reference"')
remainder = line[start:]
quote_start = remainder.find('"')
if quote_start != -1:
quote_end = remainder.find('"', quote_start + 1)
if quote_end != -1:
current_ref = remainder[quote_start + 1:quote_end]
except:
pass
# Extract value - format: (property "Value" "LM358" ...
elif in_symbol and '(property "Value"' in line:
try:
start = line.find('"Value"') + len('"Value"')
remainder = line[start:]
quote_start = remainder.find('"')
if quote_start != -1:
quote_end = remainder.find('"', quote_start + 1)
if quote_end != -1:
current_value = remainder[quote_start + 1:quote_end]
except:
pass
except Exception as e:
print(f"Error reading schematic {sch_file}: {e}")
return [{'uuid': uuid, 'reference': data['reference'], 'value': data['value']}
for uuid, data in sorted(all_parts.items(), key=lambda x: x[1]['reference'])]
@socketio.on('get_variants')
def handle_get_variants():
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
all_parts = get_all_parts_from_schematic()
emit('variants_data', {
'project_name': manager.project_name,
'variants': manager.get_variants(),
'active_variant': manager.get_active_variant(),
'all_parts': all_parts # Now includes uuid, reference, and value
})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('create_variant')
def handle_create_variant(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
description = data.get('description', '')
based_on = data.get('based_on', None)
if not name:
emit('variant_error', {'error': 'Variant name required'})
return
success = manager.create_variant(name, description, based_on)
if success:
emit('variant_updated', {'message': f'Variant "{name}" created'})
else:
emit('variant_error', {'error': f'Variant "{name}" already exists'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('delete_variant')
def handle_delete_variant(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
success = manager.delete_variant(name)
if success:
emit('variant_updated', {'message': f'Variant "{name}" deleted'})
else:
emit('variant_error', {'error': f'Cannot delete variant "{name}"'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('activate_variant')
def handle_activate_variant(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
name = data.get('name', '')
schematic_file = app_args.get('Schematic File', '')
kicad_cli = app_args.get('Kicad Cli', 'kicad-cli')
# First, sync the current variant from schematic to capture any manual changes
current_variant = manager.get_active_variant()
print(f"Syncing current variant '{current_variant}' before switching...")
# Import and call sync function directly instead of subprocess
from sync_variant import sync_variant_from_schematic
try:
sync_success = sync_variant_from_schematic(schematic_file, current_variant)
if sync_success:
print(f"Successfully synced variant '{current_variant}'")
# Reload the manager to get the updated data
manager = get_variant_manager()
else:
print(f"Warning: Sync of variant '{current_variant}' failed")
except Exception as e:
print(f"Error during sync: {e}")
import traceback
traceback.print_exc()
# Now activate the new variant
success = manager.set_active_variant(name)
if success:
# Apply new variant to schematic
apply_script_path = os.path.join(os.path.dirname(__file__), 'apply_variant.py')
result = subprocess.run(
[sys.executable, apply_script_path, schematic_file, name, kicad_cli],
capture_output=True,
text=True
)
if result.returncode == 0:
emit('variant_updated', {'message': f'Synced "{current_variant}", then activated and applied "{name}"'})
else:
error_msg = result.stderr if result.stderr else result.stdout
emit('variant_error', {'error': f'Failed to apply variant: {error_msg}'})
else:
emit('variant_error', {'error': f'Variant "{name}" not found'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('get_variant_parts')
def handle_get_variant_parts(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
variant_name = data.get('variant', '')
all_parts = get_all_parts_from_schematic()
dnp_uuids = manager.get_dnp_parts(variant_name)
parts_data = []
for part in all_parts:
parts_data.append({
'uuid': part['uuid'],
'reference': part['reference'],
'value': part['value'],
'is_dnp': part['uuid'] in dnp_uuids
})
emit('variant_parts_data', {'parts': parts_data})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('set_part_dnp')
def handle_set_part_dnp(data):
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
variant = data.get('variant', '')
uuid = data.get('uuid', '')
is_dnp = data.get('is_dnp', False)
success = manager.set_part_dnp(variant, uuid, is_dnp)
if success:
# Re-send updated parts list
handle_get_variant_parts({'variant': variant})
else:
emit('variant_error', {'error': 'Failed to update part'})
except Exception as e:
emit('variant_error', {'error': str(e)})
@socketio.on('sync_from_schematic')
def handle_sync_from_schematic():
try:
manager = get_variant_manager()
if not manager:
emit('variant_error', {'error': 'No project loaded'})
return
# Read DNP state from schematic and update active variant
schematic_file = app_args.get('Schematic File', '')
script_path = os.path.join(os.path.dirname(__file__), 'sync_variant.py')
result = subprocess.run(
[sys.executable, script_path, schematic_file],
capture_output=True,
text=True
)
if result.returncode == 0:
emit('sync_complete', {'message': f'Synced from schematic:\n{result.stdout}'})
else:
emit('variant_error', {'error': f'Failed to sync: {result.stderr}'})
except Exception as e:
emit('variant_error', {'error': str(e)})
def shutdown_server():
print("Server stopped")
os._exit(0)
def parse_args(args):
"""Parse command line arguments into a dictionary"""
parsed = {'executable': args[0] if args else ''}
i = 1
while i < len(args):
if args[i].startswith('--'):
key = args[i][2:].replace('-', ' ').title()
if i + 1 < len(args) and not args[i + 1].startswith('--'):
parsed[key] = args[i + 1]
i += 2
else:
parsed[key] = 'true'
i += 1
else:
i += 1
return parsed
if __name__ == '__main__':
# Load configuration
load_config()
# Parse arguments
app_args = parse_args(sys.argv)
# Open browser after short delay
def open_browser():
time.sleep(1.5)
webbrowser.open('http://127.0.0.1:5000')
threading.Thread(target=open_browser, daemon=True).start()
# Run the app
print("Starting Flask app...")
socketio.run(app, debug=False, host='127.0.0.1', port=5000)