289 lines
10 KiB
Python
289 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
gen_resistors_db.py
|
|
===================
|
|
Reads the approved parts list spreadsheet and adds surface mount resistor
|
|
and capacitor records to the KiCad SQLite database.
|
|
|
|
Processes all SMD resistors (0402, 0603, 0805, etc.) and capacitors from
|
|
the spreadsheet.
|
|
|
|
Each part becomes a database record with:
|
|
ipn ← GLE P/N (or generated ID if missing)
|
|
description ← Description column
|
|
value ← Value1 (e.g. "10k", "4.7k", "100")
|
|
footprint ← Standard KiCad footprint based on size (e.g., "Resistor_SMD:R_0402_1005Metric")
|
|
fp_display ← Footprint column from spreadsheet (for display purposes)
|
|
symbol ← "UM_template:R" for resistors, "UM_template:C" for capacitors
|
|
mpn ← Mfg.1 P/N
|
|
manufacturer ← Mfg.1
|
|
datasheet ← (empty for now)
|
|
class ← Class column
|
|
|
|
Where multiple approved vendors share the same value+tolerance+footprint,
|
|
only the first row is used (duplicates are reported and skipped).
|
|
|
|
Usage:
|
|
python3 gen_resistors_db.py <parts_list.xlsx>
|
|
|
|
The script reads the database path from ../database/parts.sqlite relative
|
|
to this script.
|
|
"""
|
|
|
|
import re
|
|
import sys
|
|
import sqlite3
|
|
import pandas as pd
|
|
from pathlib import Path
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_footprint(description: str, part_type: str) -> str:
|
|
"""
|
|
Extract footprint size from description and return standard KiCad footprint.
|
|
|
|
Args:
|
|
description: Part description containing size (e.g., "0402", "0603")
|
|
part_type: "resistor" or "capacitor"
|
|
|
|
Returns:
|
|
Standard KiCad footprint string
|
|
"""
|
|
# Footprint size mapping to KiCad standard footprints
|
|
resistor_footprints = {
|
|
'0201': 'Resistor_SMD:R_0201_0603Metric',
|
|
'0402': 'Resistor_SMD:R_0402_1005Metric',
|
|
'0603': 'Resistor_SMD:R_0603_1608Metric',
|
|
'0805': 'Resistor_SMD:R_0805_2012Metric',
|
|
'1206': 'Resistor_SMD:R_1206_3216Metric',
|
|
'1210': 'Resistor_SMD:R_1210_3225Metric',
|
|
'2010': 'Resistor_SMD:R_2010_5025Metric',
|
|
'2512': 'Resistor_SMD:R_2512_6332Metric',
|
|
}
|
|
|
|
capacitor_footprints = {
|
|
'0201': 'Capacitor_SMD:C_0201_0603Metric',
|
|
'0402': 'Capacitor_SMD:C_0402_1005Metric',
|
|
'0603': 'Capacitor_SMD:C_0603_1608Metric',
|
|
'0805': 'Capacitor_SMD:C_0805_2012Metric',
|
|
'1206': 'Capacitor_SMD:C_1206_3216Metric',
|
|
'1210': 'Capacitor_SMD:C_1210_3225Metric',
|
|
'2010': 'Capacitor_SMD:C_2010_5025Metric',
|
|
'2512': 'Capacitor_SMD:C_2512_6332Metric',
|
|
}
|
|
|
|
# Extract size from description
|
|
size_match = re.search(r'\b(0201|0402|0603|0805|1206|1210|2010|2512)\b', description)
|
|
if not size_match:
|
|
return ""
|
|
|
|
size = size_match.group(1)
|
|
|
|
if part_type == "resistor":
|
|
return resistor_footprints.get(size, "")
|
|
elif part_type == "capacitor":
|
|
return capacitor_footprints.get(size, "")
|
|
|
|
return ""
|
|
|
|
|
|
def process_parts(parts_df: pd.DataFrame, part_type: str, symbol: str,
|
|
cursor, existing_ipns: set) -> tuple[int, int, list]:
|
|
"""
|
|
Process a dataframe of parts (resistors or capacitors) and insert/update in database.
|
|
|
|
Args:
|
|
parts_df: DataFrame containing the parts to process
|
|
part_type: "resistor" or "capacitor" (for reporting)
|
|
symbol: KiCad symbol reference (e.g., "UM_template:R")
|
|
cursor: Database cursor
|
|
existing_ipns: Set of existing IPNs in database
|
|
|
|
Returns:
|
|
Tuple of (added_count, updated_count, skipped_list)
|
|
"""
|
|
added = 0
|
|
updated = 0
|
|
skipped = []
|
|
seen_parts: dict[str, str] = {} # value+tol+footprint → GLE P/N of first occurrence
|
|
|
|
for _, row in parts_df.iterrows():
|
|
gle_pn = str(row['GLE P/N']).strip()
|
|
value = str(row['Value1']).strip()
|
|
description = str(row['Description']).strip()
|
|
mfg = str(row['Mfg.1']).strip()
|
|
mpn = str(row['Mfg.1 P/N']).strip()
|
|
part_class = str(row.get('Class', '')).strip()
|
|
fp_display = str(row.get('Footprint', '')).strip() # From spreadsheet for display
|
|
|
|
if not gle_pn:
|
|
skipped.append((value, '(no GLE P/N)'))
|
|
continue
|
|
|
|
# Get standard KiCad footprint based on size in description
|
|
footprint = get_footprint(description, part_type)
|
|
if not footprint:
|
|
skipped.append((value, f'could not determine footprint size from: {description}'))
|
|
continue
|
|
|
|
# Create unique key from value+tolerance+footprint to detect duplicates
|
|
# Extract tolerance from description
|
|
tol_match = re.search(r'(\d+(?:\.\d+)?%)', description)
|
|
tolerance = tol_match.group(1) if tol_match else 'X'
|
|
part_key = f"{value}_{tolerance}_{footprint}"
|
|
|
|
# Skip duplicate value+tolerance+footprint combinations (alternate approved vendors)
|
|
if part_key in seen_parts:
|
|
skipped.append((value, f'dup value/tol/fp, first: {seen_parts[part_key]}, this: {gle_pn}'))
|
|
continue
|
|
seen_parts[part_key] = gle_pn
|
|
|
|
# Prepare database record
|
|
ipn = gle_pn
|
|
datasheet = "" # Could be populated from spreadsheet if available
|
|
|
|
# Insert or update record
|
|
if ipn in existing_ipns:
|
|
cursor.execute("""
|
|
UPDATE parts
|
|
SET description = ?, value = ?, footprint = ?, symbol = ?,
|
|
mpn = ?, manufacturer = ?, datasheet = ?, class = ?, fp_display = ?
|
|
WHERE ipn = ?
|
|
""", (description, value, footprint, symbol, mpn, mfg, datasheet, part_class, fp_display, ipn))
|
|
updated += 1
|
|
else:
|
|
cursor.execute("""
|
|
INSERT INTO parts (ipn, description, value, footprint, symbol, mpn, manufacturer, datasheet, class, fp_display)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (ipn, description, value, footprint, symbol, mpn, mfg, datasheet, part_class, fp_display))
|
|
added += 1
|
|
existing_ipns.add(ipn)
|
|
|
|
return added, updated, skipped
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main(xlsx_path: Path, db_path: Path):
|
|
# ---- Load spreadsheet ----
|
|
df = pd.read_excel(xlsx_path, sheet_name='PCB', dtype=str)
|
|
df = df.fillna('')
|
|
|
|
# Filter to SMD resistors and capacitors
|
|
# Match common SMD footprints: 0402, 0603, 0805, 1206, etc.
|
|
smd_pattern = r'0(201|402|603|805)|1206|1210|2010|2512'
|
|
|
|
resistor_mask = (
|
|
df['Footprint'].str.contains(smd_pattern, na=False, regex=True) &
|
|
df['Description'].str.contains('[Rr]es', na=False, regex=True)
|
|
)
|
|
resistors = df[resistor_mask].copy()
|
|
|
|
capacitor_mask = (
|
|
df['Footprint'].str.contains(smd_pattern, na=False, regex=True) &
|
|
df['Description'].str.contains('[Cc]ap', na=False, regex=True)
|
|
)
|
|
capacitors = df[capacitor_mask].copy()
|
|
|
|
print(f"Found {len(resistors)} SMD resistors in parts list")
|
|
print(f"Found {len(capacitors)} SMD capacitors in parts list")
|
|
|
|
# ---- Connect to database ----
|
|
conn = sqlite3.connect(db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get existing IPNs to check for duplicates
|
|
cursor.execute("SELECT ipn FROM parts")
|
|
existing_ipns = set(row[0] for row in cursor.fetchall())
|
|
|
|
# ---- Process resistors ----
|
|
print("\nProcessing resistors...")
|
|
r_added, r_updated, r_skipped = process_parts(
|
|
resistors, "resistor", "UM_template:R", cursor, existing_ipns
|
|
)
|
|
|
|
# ---- Process capacitors ----
|
|
print("Processing capacitors...")
|
|
c_added, c_updated, c_skipped = process_parts(
|
|
capacitors, "capacitor", "UM_template:C", cursor, existing_ipns
|
|
)
|
|
|
|
# Commit changes
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Report results
|
|
print(f"\n{'='*60}")
|
|
print("Database updated:")
|
|
print(f"{'='*60}")
|
|
print(f"\nResistors:")
|
|
print(f" Added: {r_added}")
|
|
print(f" Updated: {r_updated}")
|
|
print(f" Skipped: {len(r_skipped)} (duplicates or missing data)")
|
|
|
|
print(f"\nCapacitors:")
|
|
print(f" Added: {c_added}")
|
|
print(f" Updated: {c_updated}")
|
|
print(f" Skipped: {len(c_skipped)} (duplicates or missing data)")
|
|
|
|
print(f"\nTotals:")
|
|
print(f" Added: {r_added + c_added}")
|
|
print(f" Updated: {r_updated + c_updated}")
|
|
print(f" Skipped: {len(r_skipped) + len(c_skipped)}")
|
|
|
|
# Show some skipped items if any
|
|
all_skipped = r_skipped + c_skipped
|
|
if all_skipped:
|
|
print(f"\n Sample skipped items:")
|
|
for val, reason in all_skipped[:10]: # Show first 10
|
|
print(f" {val}: {reason}")
|
|
if len(all_skipped) > 10:
|
|
print(f" ... and {len(all_skipped) - 10} more")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# Get paths
|
|
script_dir = Path(__file__).parent
|
|
|
|
# Database path
|
|
db_path = script_dir.parent / 'database' / 'parts.sqlite'
|
|
if not db_path.exists():
|
|
print(f"Error: database not found at {db_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Spreadsheet path - try command line arg, then config file
|
|
if len(sys.argv) >= 2:
|
|
xlsx_path = Path(sys.argv[1])
|
|
else:
|
|
# Try to read from config.json
|
|
import json
|
|
config_file = script_dir / 'config.json'
|
|
if config_file.exists():
|
|
with open(config_file, 'r') as f:
|
|
config = json.load(f)
|
|
xlsx_str = config.get('parts_spreadsheet_path', '')
|
|
if xlsx_str:
|
|
xlsx_path = Path(xlsx_str)
|
|
else:
|
|
print("Error: no parts_spreadsheet_path in config.json", file=sys.stderr)
|
|
sys.exit(1)
|
|
else:
|
|
print("Error: no spreadsheet path provided and config.json not found", file=sys.stderr)
|
|
print("Usage: python3 gen_resistors_db.py <parts_list.xlsx>", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if not xlsx_path.exists():
|
|
print(f"Error: spreadsheet not found at {xlsx_path}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
print(f"Reading parts from: {xlsx_path}")
|
|
print(f"Database: {db_path}")
|
|
print()
|
|
|
|
main(xlsx_path, db_path)
|