Files

2254 lines
96 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from flask import Blueprint, render_template, request, jsonify, flash, redirect, url_for, json
from flask_login import login_required, current_user
from app.extensions import db
from app.models.project import Project
from app.models.company import Company
from app.models.aufmass import Aufmass
from app.models.aufmass_typ import AufmassTyp
from app.models.position import Position
from app.models.lv import LVPosition
from app.models.contract import Contract
from app.models.project_access import ProjectAccess
from app.models.user import User
from app.models.aufmass_history import AufmassHistory
from app.services.license_service import get_aktive_module
from app.models.custom_module import CustomModule, CustomModuleAssignment
from datetime import datetime
aufmass_bp = Blueprint('aufmass', __name__)
def _check_zugriff(project, required='lesen'):
if not current_user.hat_zugriff(project, required):
return False
return True
def _get_aufmass_or_404(aufmass_id, project_id):
a = Aufmass.query.get_or_404(aufmass_id)
if a.project_id != project_id:
return None
return a
def _build_history_description(action_type, pos_data_list, user_name=''):
"""Baut detaillierte History-Beschreibung.
Args:
action_type: 'add', 'delete', 'edit', 'move', 'copy'
pos_data_list: Liste von Dict mit position_id, pos_nr, kurztext, old, new
user_name: Name des Benutzers
Returns:
Tuple (kurze_beschreibung, lange_beschreibung)
"""
from datetime import datetime
now = datetime.now().strftime('%d.%m.%Y %H:%M')
def _format_field_diff(old_val, new_val, field):
"""Formatiert Unterschied für ein Feld."""
old_str = str(old_val) if old_val is not None else 'leer'
new_str = str(new_val) if new_val is not None else 'leer'
if old_str == new_str:
return None
return f"{field}: {old_str}{new_str}"
if not pos_data_list:
return ('', '')
short_parts = []
long_parts = []
for pd in pos_data_list:
pos_label = f"Zeile ?, Pos {pd.get('pos_nr', pd.get('position_id', '?'))}"
row_idx = pd.get('row_idx', 0) + 1 if pd.get('row_idx') is not None else '?'
kurztext = pd.get('kurztext', '')[:30]
if kurztext:
kurztext = f'"{kurztext}"'
if action_type == 'add':
short_parts.append(f"+ Pos {pd.get('pos_nr', '?')} ({kurztext})")
long_parts.append(f"+ Zeile {row_idx}: Pos {pd.get('pos_nr', '?')} {kurztext}")
elif action_type == 'delete':
short_parts.append(f"- Pos {pd.get('pos_nr', '?')} ({kurztext})")
long_parts.append(f"- Zeile {row_idx}: Pos {pd.get('pos_nr', '?')} {kurztext}")
elif action_type == 'edit':
old_vals = pd.get('old', {})
new_vals = pd.get('new', {})
field_labels = {
'faktor': 'Faktor', 'laenge': 'Länge', 'breite': 'Breite', 'tiefe': 'Tiefe',
'menge': 'Menge', 'menge_hinten': 'Menge hinten', 'einheit': 'EH',
'abschnitt': 'Abschnitt', 'bemerkung': 'Bemerkung', 'formel': 'Formel',
'formel_typ': 'Z-Art', 'kurztext': 'Kurztext'
}
changes = []
for field in set(list(old_vals.keys()) + list(new_vals.keys())):
if field.startswith('_') or field == 'id':
continue
label = field_labels.get(field, field)
old_v = old_vals.get(field)
new_v = new_vals.get(field)
diff = _format_field_diff(old_v, new_v, label)
if diff:
changes.append(diff)
if changes:
short_parts.append(f"✎ Pos {pd.get('pos_nr', '?')}: {', '.join(changes[:2])}")
long_parts.append(f"✎ Zeile {row_idx}: Pos {pd.get('pos_nr', '?')} {kurztext} - {'; '.join(changes)}")
elif action_type == 'move':
old_idx = pd.get('old_idx', pd.get('old', {}).get('sort_order', '?'))
new_idx = pd.get('new_idx', pd.get('new', {}).get('sort_order', '?'))
if old_idx != new_idx:
direction = '' if new_idx > old_idx else ''
short_parts.append(f"↔ Pos {pd.get('pos_nr', '?')}: {old_idx} {direction} {new_idx}")
long_parts.append(f"↔ Zeile {old_idx}{new_idx}: Pos {pd.get('pos_nr', '?')} {kurztext} {direction}")
elif action_type == 'copy':
short_parts.append(f"⎘ Pos {pd.get('pos_nr', '?')} kopiert")
long_parts.append(f"⎘ Zeile {row_idx}: Pos {pd.get('pos_nr', '?')} {kurztext} kopiert")
short = ', '.join(short_parts[:3])
if len(short_parts) > 3:
short += f' (+{len(short_parts)-3})'
long = f"[{now}] {user_name}\n" + '\n'.join(long_parts)
return (short, long)
# === Projekt-Liste (alle Aufmaß-Projekte) ===
@aufmass_bp.route('/')
@login_required
def index():
if current_user.is_firmadmin():
projekte = Project.query.filter_by(
company_id=current_user.company_id
).order_by(db.func.coalesce(Project.bezeichnung, '').asc()).all()
elif current_user.is_superadmin():
projekte = Project.query.order_by(db.func.coalesce(Project.bezeichnung, '').asc()).all()
else:
zugriff_ids = db.session.query(ProjectAccess.project_id).filter_by(
user_id=current_user.id
).subquery()
projekte = Project.query.filter(Project.id.in_(zugriff_ids)).order_by(db.func.coalesce(Project.bezeichnung, '').asc()).all()
preise_sichtbar = current_user.is_superadmin() or current_user.is_firmadmin() or current_user.darf_preise_sehen
data = []
from app.models.position import Position
gesamt_summe = 0.0
gesamt_positionen = 0
for p in projekte:
aufmass_liste = Aufmass.query.filter_by(project_id=p.id).order_by(Aufmass.sortierung).all()
projekt_summe = 0.0
projekt_positionen = 0
aufmass_data = []
for a in aufmass_liste:
summe = db.session.query(db.func.coalesce(db.func.sum(Position.gesamtpreis), 0)).filter(
Position.aufmass_id == a.id
).scalar() or 0.0
projekt_summe += summe
anzahl = Position.query.filter_by(aufmass_id=a.id).count()
projekt_positionen += anzahl
aufmass_data.append({'aufmass': a, 'summe': summe, 'positionen': anzahl})
gesamt_summe += projekt_summe
gesamt_positionen += projekt_positionen
data.append({'project': p, 'aufmass_liste': aufmass_data, 'summe': projekt_summe, 'positionen': projekt_positionen})
typen = AufmassTyp.query.order_by(AufmassTyp.sortierung).all()
if current_user.company_id:
contracts = Contract.query.filter_by(
company_id=current_user.company_id
).order_by(Contract.name).all()
company = Company.query.get(current_user.company_id)
else:
contracts = Contract.query.order_by(Contract.name).all()
company = None
return render_template('aufmass/index.html', projekte=data, titel='Aufmaß-Projekte',
gesamt_summe=gesamt_summe, gesamt_positionen=gesamt_positionen,
preise_sichtbar=preise_sichtbar, typen=typen, contracts=contracts, company=company)
@aufmass_bp.route('/neu', methods=['GET', 'POST'])
@login_required
def neu():
if not current_user.is_firmadmin() and not current_user.darf_projekte_anlegen:
flash('Keine Berechtigung.', 'danger')
return redirect(url_for('aufmass.index'))
contracts = Contract.query.filter_by(
company_id=current_user.company_id
).order_by(Contract.name).all()
if request.method == 'POST':
contract_id = request.form.get('contract_id', type=int)
contract = Contract.query.get(contract_id) if contract_id else None
bezeichnung = request.form.get('bezeichnung', '').strip()
project = Project(
company_id=current_user.company_id,
contract_id=contract_id,
sm_nr='',
bezeichnung=bezeichnung,
vertrag=contract.name if contract else request.form.get('vertrag', '').strip(),
lv_name=request.form.get('lv_name', '').strip(),
status='aktiv',
erstellt_von=current_user.id,
)
db.session.add(project)
db.session.commit()
flash(f'Projekt "{bezeichnung or project.id}" angelegt.', 'success')
return redirect(url_for('aufmass.aufmass_list', project_id=project.id))
return render_template('aufmass/neu.html', contracts=contracts, titel='Neues Aufmaß')
# === Aufmaß-Liste innerhalb eines Projekts ===
@aufmass_bp.route('/<int:project_id>')
@login_required
def aufmass_list(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return 'Zugriff verweigert', 403
aufmass_liste = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).all()
typen = AufmassTyp.query.order_by(AufmassTyp.sortierung).all()
contracts = Contract.query.filter_by(company_id=project.company_id).order_by(Contract.name).all()
preise_sichtbar = current_user.is_superadmin() or current_user.is_firmadmin() or current_user.darf_preise_sehen
# Preise pro Aufmaß berechnen
from app.models.position import Position
aufmass_preise = {}
for a in aufmass_liste:
summe = db.session.query(db.func.coalesce(db.func.sum(Position.gesamtpreis), 0)).filter(
Position.aufmass_id == a.id
).scalar() or 0.0
aufmass_preise[a.id] = summe
lv_names = [r[0] for r in db.session.query(LVPosition.lv_name).filter_by(
company_id=project.company_id
).distinct().order_by(LVPosition.lv_name).all()]
return render_template('aufmass/list.html', project=project,
aufmass_liste=aufmass_liste, typen=typen,
preise_sichtbar=preise_sichtbar, aufmass_preise=aufmass_preise,
contracts=contracts, lv_names=lv_names, company=current_user.company,
titel=f'Aufmaße {project.bezeichnung or project.sm_nr}')
# === Aufmaß CRUD ===
@aufmass_bp.route('/<int:project_id>/aufmass/neu', methods=['POST'])
@login_required
def aufmass_neu(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
name = request.form.get('name', '').strip() or 'Neues Aufmaß'
typ = request.form.get('typ', '').strip()
max_sort = db.session.query(db.func.max(Aufmass.sortierung)).filter_by(project_id=project_id).scalar() or 0
a = Aufmass(project_id=project_id, name=name, typ=typ, sortierung=max_sort + 1, erstellt_von=current_user.id)
db.session.add(a)
db.session.commit()
flash(f'Aufmaß "{name}" angelegt.', 'success')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@aufmass_bp.route('/<int:project_id>/aufmass/neu-voll', methods=['POST'])
@login_required
def aufmass_neu_voll(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
contract_id = request.form.get('contract_id', type=int)
contract = Contract.query.get(contract_id) if contract_id else None
bauabschnitt = request.form.get('bauabschnitt', '').strip()
import re
invalid_chars = r'[<>:"/\\|?*&#%{}~\[\]]'
if re.search(invalid_chars, bauabschnitt):
flash('Bauabschnitt enthält ungültige Zeichen.', 'danger')
return redirect(url_for('aufmass.index'))
project.baustelle = request.form.get('baustelle', '').strip()
project.sm_nr = request.form.get('sm_nr', '').strip()
project.vertrag = contract.name if contract else request.form.get('vertrag', '').strip()
project.abruf_nr = request.form.get('abruf_nr', '').strip()
project.lv_name = request.form.get('lv_name', '').strip()
project.datum_start = _parse_date(request.form.get('datum_start'))
project.datum_ende = _parse_date(request.form.get('datum_ende'))
project.datum = _parse_date(request.form.get('datum'))
project.ev_details_id = request.form.get('ev_details_id', '').strip()
project.ansprechpartner_vorname = request.form.get('ansprechpartner_vorname', '').strip()
project.ansprechpartner_nachname = request.form.get('ansprechpartner_nachname', '').strip()
project.ansprechpartner_tel = request.form.get('ansprechpartner_tel', '').strip()
project.ansprechpartner_email = request.form.get('ansprechpartner_email', '').strip()
project.bauabschnitt = bauabschnitt
typ = request.form.get('typ', '').strip()
name_parts = [s for s in [project.bezeichnung, project.bauabschnitt, project.sm_nr, project.abruf_nr] if s]
name = ' - '.join(name_parts) if name_parts else 'Neues Aufmass'
name = re.sub(r'\s+', ' ', name).strip()
max_sort = db.session.query(db.func.max(Aufmass.sortierung)).filter_by(project_id=project_id).scalar() or 0
a = Aufmass(project_id=project_id, name=name, typ=typ, sortierung=max_sort + 1, erstellt_von=current_user.id)
db.session.add(a)
db.session.commit()
flash(f'Aufmaß "{name}" für "{project.bezeichnung or project.sm_nr}" angelegt.', 'success')
return redirect(url_for('aufmass.index'))
@aufmass_bp.route('/<int:project_id>/aufmass/import', methods=['POST'])
@login_required
def aufmass_import(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
if 'file' not in request.files:
flash('Keine Datei ausgewählt.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
f = request.files['file']
if f.filename == '':
flash('Keine Datei ausgewählt.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
try:
content = f.read().decode('utf-8-sig').splitlines()
except Exception:
content = f.read().decode('latin-1').splitlines()
lines = [l.strip() for l in content if l.strip()]
if len(lines) < 16 or lines[0] != '[Kopfdaten]':
flash('Ungültiges Dateiformat.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
def kv(val):
return val.split('=', 1)[1] if '=' in val else ''
teilaufma = kv(lines[1])
schlussaufma = kv(lines[2])
datum_str = kv(lines[3])
baustelle = kv(lines[4])
abruf_nr = kv(lines[5])
sm_nr = kv(lines[6])
startz_str = kv(lines[8])
endz_str = kv(lines[9])
aspa_name = kv(lines[10])
aspa_tel = kv(lines[11])
bauabschnitt = kv(lines[12])
# Typ ermitteln
typ = ''
if schlussaufma == 'X':
typ = 'Schlussaufmaß'
elif teilaufma == 'X':
typ = 'Teilaufmaß/AZ'
aspa_vorname = aspa_name
aspa_nachname = ''
if ' ' in aspa_name.strip():
p = aspa_name.strip().rsplit(' ', 1)
aspa_vorname = p[0]
aspa_nachname = p[1]
project.baustelle = baustelle or project.baustelle
project.bauabschnitt = bauabschnitt or project.bauabschnitt
project.abruf_nr = abruf_nr or project.abruf_nr
project.sm_nr = sm_nr or project.sm_nr
project.datum = _parse_date(datum_str) or project.datum
project.datum_start = _parse_date(startz_str) or project.datum_start
project.datum_ende = _parse_date(endz_str) or project.datum_ende
project.ansprechpartner_vorname = aspa_vorname or project.ansprechpartner_vorname
project.ansprechpartner_nachname = aspa_nachname or project.ansprechpartner_nachname
project.ansprechpartner_tel = aspa_tel or project.ansprechpartner_tel
import re
_tofloat = lambda s: float(s.replace(',', '.')) if s else 0.0
positions_data = []
errors = []
in_data = False
for line in lines:
if line == '[Aufmaßdaten]':
in_data = True
continue
if in_data and '|' in line:
cols = line.split('|')
if len(cols) < 13:
continue
pos_nr = cols[1].strip()
faktor = _tofloat(cols[2])
laenge = _tofloat(cols[3])
breite = _tofloat(cols[4])
tiefe = _tofloat(cols[5])
menge_aus_cols = _tofloat(cols[6])
einheit = cols[7].strip() or 'ST'
kurztext = cols[8].strip()
bemerkung = cols[9].strip()
menge2 = _tofloat(cols[10])
einzelpreis = _tofloat(cols[11])
gesamtpreis = _tofloat(cols[12])
if not pos_nr:
# Separatorzeilen (nur Pipes) leise überspringen
if not any(cols[i].strip() for i in range(1, len(cols))):
continue
errors.append('Position ohne Nummer übersprungen.')
continue
if not kurztext:
errors.append(f'Position {pos_nr}: Kein Kurztext.')
menge = menge_aus_cols or menge2
if menge == 0.0 and einheit in ('M', 'M2', 'M3'):
if einheit == 'M':
menge = laenge
elif einheit == 'M2':
menge = laenge * breite
elif einheit == 'M3':
menge = laenge * breite * tiefe
menge *= faktor
else:
menge *= faktor
berechneter_gp = round(menge * einzelpreis, 2)
if gesamtpreis > 0 and abs(berechneter_gp - gesamtpreis) > 0.05:
errors.append(f'Position {pos_nr}: GP-Differenz {berechneter_gp} vs {gesamtpreis}')
positions_data.append({
'pos_nr': pos_nr,
'kurztext': kurztext,
'einheit': einheit,
'einzelpreis': einzelpreis,
'menge': menge,
'gesamtpreis': gesamtpreis or berechneter_gp,
'laenge': laenge,
'breite': breite,
'tiefe': tiefe,
'faktor': faktor,
'bemerkung': bemerkung,
})
if not positions_data:
flash('Keine gültigen Positionen in der Datei gefunden.', 'warning')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
# Aufmaß-Name
name_parts = [s for s in [baustelle or project.bezeichnung, bauabschnitt, sm_nr, abruf_nr] if s]
name = ' - '.join(name_parts) if name_parts else 'Importiertes Aufmaß'
name = re.sub(r'\s+', ' ', name).strip()
name = re.sub(r'[<>:"/\\|?*&#%{}~\[\]]', '', name)
# Doppelten Namen vermeiden
existing = {n[0] for n in db.session.query(Aufmass.name).filter_by(project_id=project_id).all()}
if name in existing:
i = 1
while f'{name} ({i})' in existing:
i += 1
name = f'{name} ({i})'
max_sort = db.session.query(db.func.max(Aufmass.sortierung)).filter_by(project_id=project_id).scalar() or 0
a = Aufmass(project_id=project_id, name=name, typ=typ, sortierung=max_sort + 1, erstellt_von=current_user.id)
db.session.add(a)
db.session.flush()
for idx, pd in enumerate(positions_data):
pos = Position(
project_id=project_id, aufmass_id=a.id,
pos_nr=pd['pos_nr'], sortierung=idx + 1,
kurztext=pd['kurztext'], einheit=pd['einheit'] or 'ST',
einzelpreis=pd['einzelpreis'], menge=pd['menge'],
faktor=pd['faktor'], laenge=pd['laenge'],
breite=pd['breite'], tiefe=pd['tiefe'],
gesamtpreis=pd['gesamtpreis'], formel_typ='standard',
bemerkung=pd['bemerkung'],
)
pos.menge_hinten = pos.faktor * pos.menge
db.session.add(pos)
db.session.commit()
msg = f'Aufmaß "{name}" mit {len(positions_data)} Positionen importiert.'
flash(msg, 'success')
ref = request.referrer or ''
if ref.rstrip('/').endswith('/projekt'):
return redirect(url_for('aufmass.index'))
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/import-txt', methods=['POST'])
@login_required
def positionen_import_txt(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
flash('Aufmaß nicht gefunden.', 'danger')
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
if 'file' not in request.files:
flash('Keine Datei ausgewählt.', 'danger')
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
f = request.files['file']
if f.filename == '':
flash('Keine Datei ausgewählt.', 'danger')
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
try:
content = f.read().decode('utf-8-sig').splitlines()
except Exception:
content = f.read().decode('latin-1').splitlines()
lines = [l.strip() for l in content if l.strip()]
import re
_tofloat = lambda s: float(s.replace(',', '.')) if s else 0.0
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(aufmass_id=aufmass_id).scalar() or 0
positions_added = 0
errors = []
in_data = False
for line in lines:
if line == '[Aufmaßdaten]':
in_data = True
continue
if in_data and '|' in line:
cols = line.split('|')
if len(cols) < 13:
continue
pos_nr = cols[1].strip()
faktor = _tofloat(cols[2])
laenge = _tofloat(cols[3])
breite = _tofloat(cols[4])
tiefe = _tofloat(cols[5])
menge_aus_cols = _tofloat(cols[6])
einheit = cols[7].strip() or 'ST'
kurztext = cols[8].strip()
bemerkung = cols[9].strip()
menge2 = _tofloat(cols[10])
einzelpreis = _tofloat(cols[11])
gesamtpreis = _tofloat(cols[12])
if not pos_nr:
continue
menge = menge_aus_cols or menge2
if menge == 0.0 and einheit in ('M', 'M2', 'M3'):
if einheit == 'M':
menge = laenge
elif einheit == 'M2':
menge = laenge * breite
elif einheit == 'M3':
menge = laenge * breite * tiefe
menge *= faktor
else:
menge *= faktor
max_sort += 1
pos = Position(
project_id=project_id, aufmass_id=aufmass_id,
pos_nr=pos_nr, sortierung=max_sort,
kurztext=kurztext, einheit=einheit or 'ST',
einzelpreis=einzelpreis, menge=menge,
faktor=faktor, laenge=laenge,
breite=breite, tiefe=tiefe,
gesamtpreis=gesamtpreis or round(menge * einzelpreis, 2),
formel_typ='standard', bemerkung=bemerkung,
)
pos.menge_hinten = pos.faktor * pos.menge
db.session.add(pos)
positions_added += 1
db.session.commit()
msg = f'{positions_added} Positionen importiert.'
if errors:
msg += ' Hinweise: ' + '; '.join(errors[:3])
flash(msg, 'success' if not errors else 'warning')
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
@aufmass_bp.route('/<int:project_id>/kopfdaten-ev-holen', methods=['POST'])
@login_required
def kopfdaten_ev_holen(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
if not current_user.darf_evergabe_nutzen or not current_user.darf_kopfdaten_holen:
return jsonify({'error': 'Keine Berechtigung'}), 403
company = Company.query.get(current_user.company_id)
if not company or not company.evergabe_aktiviert:
return jsonify({'error': 'E-Vergabe nicht freigeschaltet'}), 403
if not company.evergabe_benutzer or not company.evergabe_passwort:
return jsonify({'error': 'Keine Logindaten hinterlegt'}), 400
data = request.get_json(silent=True) or {}
sm_nr = data.get('sm_nr', '').strip()
if not sm_nr:
sm_nr = project.sm_nr.strip()
if not sm_nr:
return jsonify({'error': 'Keine SM-Nr angegeben'}), 400
try:
from app.services.evergabe_service import EVergabeClient
client = EVergabeClient(
username=company.evergabe_benutzer,
password=company.evergabe_passwort,
name=company.evergabe_name or ''
)
ev_data = client.hole_kopfdaten(sm_nr)
import logging
logging.getLogger(__name__).info(f'EV-Daten: {ev_data}')
def _convert_date(d):
if not d:
return ''
parts = d.strip().split('.')
if len(parts) == 3:
return f'{parts[2]}-{parts[1]}-{parts[0]}'
return d
ap_name = ev_data.get('ansprechpartner_name', '')
ap_parts = ap_name.split()
ap_vorname = ap_parts[0] if len(ap_parts) > 1 else ''
ap_nachname = ' '.join(ap_parts[1:]) if len(ap_parts) > 1 else ap_name
return jsonify({
'bezeichnung': ev_data.get('bezeichnung', ''),
'bauabschnitt': '',
'sm_nr': ev_data.get('sm_nr', ''),
'abruf_nr': ev_data.get('abruf_nr', ''),
'datum_start': _convert_date(ev_data.get('beleg_eingang', '')),
'datum_ende': _convert_date(ev_data.get('ausfuehrungsfrist', '')),
'datum': _convert_date(ev_data.get('beleg_eingang', '')),
'ev_details_id': ev_data.get('detail_id', ''),
'ansprechpartner_vorname': ap_vorname,
'ansprechpartner_nachname': ap_nachname,
'ansprechpartner_tel': ev_data.get('ansprechpartner_tel', ''),
'ansprechpartner_email': ev_data.get('ansprechpartner_email', ''),
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@aufmass_bp.route('/<int:project_id>/aufmass/<int:aufmass_id>/evergabe-uebertragen', methods=['POST'])
@login_required
def aufmass_evergabe_uebertragen(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
if not current_user.darf_evergabe_nutzen or not current_user.darf_aufmass_uebertragen:
return jsonify({'error': 'Keine Berechtigung'}), 403
company = Company.query.get(current_user.company_id)
if not company or not company.evergabe_aktiviert:
return jsonify({'error': 'E-Vergabe nicht freigeschaltet'}), 403
if not company.evergabe_benutzer or not company.evergabe_passwort:
return jsonify({'error': 'Keine Logindaten hinterlegt'}), 400
aufmass = Aufmass.query.get_or_404(aufmass_id)
if aufmass.project_id != project_id:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
positionen = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
if not positionen:
return jsonify({'error': 'Keine Positionen vorhanden'}), 400
pos_liste = []
for p in positionen:
pos_liste.append({
'pos_nr': str(p.pos_nr or ''),
'bezeichnung': str(p.bezeichnung or ''),
'einheit': str(p.einheit or 'ST'),
'menge': str(p.menge or ''),
'faktor': str(p.faktor or '1'),
'laenge': str(getattr(p, 'laenge', '') or ''),
'breite': str(getattr(p, 'breite', '') or ''),
'tiefe': str(getattr(p, 'tiefe', '') or ''),
'langtext': str(p.langtext or ''),
'bauabschnitt': str(project.bauabschnitt or ''),
})
try:
from app.services.evergabe_service import EVergabeClient
client = EVergabeClient(
username=company.evergabe_benutzer,
password=company.evergabe_passwort,
name=company.evergabe_name or ''
)
ergebnisse = client.aufmass_uebertragen(
sm_nr=project.sm_nr,
positionen=pos_liste,
bauabschnitt=project.bauabschnitt or '',
leist_zeitv=str(project.datum_start or ''),
leist_zeitb=str(project.datum_ende or ''),
leistungsort=project.bezeichnung or '',
schlussaufmass=(aufmass.status == 'abgeschlossen'),
)
ok_count = sum(1 for e in ergebnisse if e['status'] == 'ok')
return jsonify({
'success': True,
'gesamt': len(ergebnisse),
'ok': ok_count,
'fehler': len(ergebnisse) - ok_count,
'details': ergebnisse
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@aufmass_bp.route('/<int:project_id>/aufmass/<int:aufmass_id>/umbenennen', methods=['POST'])
@login_required
def aufmass_umbenennen(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json(silent=True) or {}
if 'name' in data:
a.name = data['name']
if 'typ' in data:
a.typ = data['typ']
if 'status' in data and data['status'] in ('aktiv', 'abgeschlossen', 'storniert'):
a.status = data['status']
db.session.commit()
return jsonify({'status': 'ok', 'name': a.name, 'typ': a.typ, 'status': a.status})
@aufmass_bp.route('/<int:project_id>/aufmass/<int:aufmass_id>/loeschen', methods=['POST'])
@login_required
def aufmass_loeschen(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
db.session.delete(a)
db.session.commit()
flash('Aufmaß gelöscht.', 'success')
ref = request.referrer or ''
if ref.rstrip('/').endswith('/projekt'):
return redirect(url_for('aufmass.index'))
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/duplizieren', methods=['POST'])
@login_required
def aufmass_duplizieren(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
orig = _get_aufmass_or_404(aufmass_id, project_id)
if not orig:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
import re
name = orig.name or 'Neues Aufmass'
name = re.sub(r'\s*\(Kopie\)\s*$', '', name)
name = name.strip() + ' (Kopie)'
max_sort = db.session.query(db.func.max(Aufmass.sortierung)).filter_by(project_id=project_id).scalar() or 0
new_a = Aufmass(
project_id=project_id, name=name, typ=orig.typ, status='aktiv',
sortierung=max_sort + 1, bemerkung=orig.bemerkung,
erstellt_von=current_user.id
)
db.session.add(new_a)
db.session.flush()
for pos in orig.positionen.order_by(Position.sortierung).all():
new_pos = Position(
project_id=project_id, aufmass_id=new_a.id,
lv_position_id=pos.lv_position_id, pos_nr=pos.pos_nr,
sortierung=pos.sortierung, rsa=pos.rsa, abschnitt=pos.abschnitt,
kurztext=pos.kurztext, langtext=pos.langtext,
einheit=pos.einheit, einzelpreis=pos.einzelpreis,
menge=pos.menge, gesamtpreis=pos.gesamtpreis,
faktor=pos.faktor, laenge=pos.laenge, breite=pos.breite, tiefe=pos.tiefe,
formel_typ=pos.formel_typ, formel=pos.formel,
bemerkung=pos.bemerkung, menge_hinten=pos.menge_hinten
)
db.session.add(new_pos)
db.session.commit()
flash(f'Aufmaß "{name}" dupliziert.', 'success')
ref = request.referrer or ''
if ref.rstrip('/').endswith('/projekt'):
return redirect(url_for('aufmass.index'))
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
# === Aufmaß-Typen verwalten ===
@aufmass_bp.route('/typen')
@login_required
def typen_liste():
if not current_user.is_firmadmin() and not current_user.is_superadmin():
flash('Keine Berechtigung.', 'danger')
return redirect(url_for('aufmass.index'))
typen = AufmassTyp.query.order_by(AufmassTyp.sortierung).all()
return render_template('aufmass/typen.html', typen=typen, titel='Aufmaß-Typen')
@aufmass_bp.route('/typen/neu', methods=['POST'])
@login_required
def typ_neu():
if not current_user.is_firmadmin() and not current_user.is_superadmin():
return jsonify({'error': 'Keine Berechtigung'}), 403
name = request.form.get('name', '').strip()
if not name:
flash('Name erforderlich.', 'danger')
return redirect(url_for('aufmass.typen_liste'))
if AufmassTyp.query.filter_by(name=name).first():
flash('Typ existiert bereits.', 'danger')
return redirect(url_for('aufmass.typen_liste'))
t = AufmassTyp(name=name, company_id=current_user.company_id,
sortierung=(db.session.query(db.func.max(AufmassTyp.sortierung)).scalar() or 0) + 1)
db.session.add(t)
db.session.commit()
flash(f'Typ "{name}" angelegt.', 'success')
return redirect(url_for('aufmass.typen_liste'))
@aufmass_bp.route('/typen/<int:typ_id>/edit', methods=['POST'])
@login_required
def typ_edit(typ_id):
if not current_user.is_firmadmin() and not current_user.is_superadmin():
flash('Keine Berechtigung.', 'danger')
return redirect(url_for('aufmass.typen_liste'))
t = AufmassTyp.query.get_or_404(typ_id)
name = request.form.get('name', '').strip()
if name:
t.name = name
db.session.commit()
flash('Typ aktualisiert.', 'success')
return redirect(url_for('aufmass.typen_liste'))
@aufmass_bp.route('/typen/<int:typ_id>/loeschen', methods=['POST'])
@login_required
def typ_loeschen(typ_id):
if not current_user.is_firmadmin() and not current_user.is_superadmin():
flash('Keine Berechtigung.', 'danger')
return redirect(url_for('aufmass.typen_liste'))
t = AufmassTyp.query.get_or_404(typ_id)
db.session.delete(t)
db.session.commit()
flash('Typ gelöscht.', 'success')
return redirect(url_for('aufmass.typen_liste'))
# === Editor (Positionen verwalten) ===
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>')
@login_required
def bearbeiten(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return 'Zugriff verweigert', 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
flash('Aufmaß nicht gefunden.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
positionen = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
lv_names = db.session.query(LVPosition.lv_name).filter_by(
company_id=current_user.company_id
).distinct().order_by(LVPosition.lv_name).all()
lv_names = [r[0] for r in lv_names]
lv_positionen = []
if project.lv_name:
lv_positionen = LVPosition.query.filter_by(
company_id=current_user.company_id,
lv_name=project.lv_name
).order_by(LVPosition.order_index).all()
modules = get_aktive_module(current_user.company_id, user=current_user)
custom_modules = []
all_cm = CustomModule.query.filter(
CustomModule.is_active == True,
CustomModule.is_template == False,
CustomModule.company_id == current_user.company_id
).order_by(CustomModule.sort_index).all()
for cm in all_cm:
if current_user.is_superadmin() or current_user.is_firmadmin():
custom_modules.append(cm)
elif CustomModuleAssignment.query.filter_by(
module_id=cm.id, user_id=current_user.id
).first():
custom_modules.append(cm)
hidden_mods = current_user.get_hidden_modules()
hidden_set = {(h['type'], str(h['id'])) for h in hidden_mods}
# Lock-Prüfung
locked, holder_id = a.is_locked()
locked_by_name = None
if locked and holder_id == current_user.id:
a.refresh_lock(current_user.id)
db.session.commit()
elif locked:
lh = User.query.get(holder_id)
locked_by_name = lh.full_name if lh else 'Unbekannt'
else:
a.try_lock(current_user.id)
db.session.commit()
return render_template('aufmass/bearbeiten.html',
project=project, aufmass=a,
positionen=positionen,
lv_names=lv_names,
lv_positionen=lv_positionen,
modules=modules,
custom_modules=custom_modules,
hidden_set=hidden_set,
locked_by_name=locked_by_name,
titel=f'Aufmaß {project.sm_nr} {a.name}')
# === Lock-Management ===
def _lock_error(aufmass_id):
a = Aufmass.query.get(aufmass_id)
if not a:
return None
locked, holder_id = a.is_locked()
if locked and holder_id != current_user.id:
holder = User.query.get(holder_id)
name = holder.full_name if holder else 'Unbekannt'
return jsonify({'locked': True, 'holder': name}), 423
return None
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/lock', methods=['POST'])
@login_required
def lock_aufmass(project_id, aufmass_id):
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
locked, holder_id = a.is_locked()
if locked and holder_id != current_user.id:
holder = User.query.get(holder_id)
return jsonify({'locked': True, 'holder': holder.full_name if holder else 'Unbekannt'}), 423
a.try_lock(current_user.id)
db.session.commit()
return jsonify({'locked': False, 'ok': True})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/unlock', methods=['POST'])
@login_required
def unlock_aufmass(project_id, aufmass_id):
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
if a.locked_by == current_user.id:
a.unlock()
db.session.commit()
return jsonify({'ok': True})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/heartbeat', methods=['POST'])
@login_required
def heartbeat_aufmass(project_id, aufmass_id):
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
a.refresh_lock(current_user.id)
db.session.commit()
return jsonify({'ok': True})
# === Position CRUD (innerhalb eines Aufmaßes) ===
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/position/hinzufuegen', methods=['POST'])
@login_required
def position_hinzufuegen(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return 'Zugriff verweigert', 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return 'Aufmaß nicht gefunden', 404
err = _lock_error(aufmass_id)
if err:
return err
lv_pos_id = request.form.get('lv_position_id')
if lv_pos_id:
lv_pos = LVPosition.query.get(int(lv_pos_id))
if not lv_pos or lv_pos.company_id != current_user.company_id:
return 'Ungültige LV-Position', 400
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(
aufmass_id=aufmass_id
).scalar() or 0
pos = Position(
project_id=project_id, aufmass_id=aufmass_id,
lv_position_id=lv_pos.id,
pos_nr=lv_pos.pos_nr,
sortierung=max_sort + 1,
rsa=lv_pos.rsa,
abschnitt=lv_pos.abschnitt,
kurztext=lv_pos.kurztext,
langtext=lv_pos.langtext,
einheit=lv_pos.einheit,
einzelpreis=lv_pos.einzelpreis,
faktor=1.0, laenge=0, breite=0, tiefe=0,
menge=0, gesamtpreis=0, formel_typ='standard',
)
pos.berechne_menge()
else:
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(
aufmass_id=aufmass_id
).scalar() or 0
pos = Position(
project_id=project_id, aufmass_id=aufmass_id,
pos_nr=request.form.get('pos_nr', ''),
sortierung=max_sort + 1,
kurztext=request.form.get('kurztext', ''),
einheit=request.form.get('einheit', 'ST'),
einzelpreis=float(request.form.get('einzelpreis', 0)),
faktor=1.0, laenge=0, breite=0, tiefe=0,
menge=0, gesamtpreis=0, formel_typ='standard',
)
db.session.add(pos); db.session.commit()
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/position/<int:pos_id>/aktualisieren', methods=['POST'])
@login_required
def position_aktualisieren(project_id, aufmass_id, pos_id):
pos = Position.query.get_or_404(pos_id)
if pos.project.company_id != current_user.company_id:
return 'Zugriff verweigert', 403
if not _check_zugriff(pos.project, 'schreiben'):
return 'Zugriff verweigert', 403
err = _lock_error(aufmass_id)
if err:
return err
from app.utils import capture_diff
new_state = {
'pos_nr': request.form.get('pos_nr', pos.pos_nr),
'kurztext': request.form.get('kurztext', pos.kurztext),
'einheit': request.form.get('einheit', pos.einheit),
'einzelpreis': float(request.form.get('einzelpreis', pos.einzelpreis)),
'faktor': float(request.form.get('faktor', pos.faktor)),
'laenge': float(request.form.get('laenge', pos.laenge)),
'breite': float(request.form.get('breite', pos.breite)),
'tiefe': float(request.form.get('tiefe', pos.tiefe)),
'bemerkung': request.form.get('bemerkung', pos.bemerkung),
'abschnitt': request.form.get('abschnitt', pos.abschnitt),
'formel_typ': request.form.get('formel_typ', pos.formel_typ or 'standard'),
'formel': request.form.get('formel', pos.formel)
}
diff = capture_diff(pos, new_state)
for k, v in new_state.items():
setattr(pos, k, v)
pos.berechne_menge()
from app.models.aufmass_history import AufmassHistory
hist = AufmassHistory(aufmass_id=aufmass_id, changed_by=current_user.id, action='change', diff=json.dumps(diff))
db.session.add(hist)
db.session.commit()
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
FIELDS = {'pos_nr','kurztext','einheit','einzelpreis','faktor','laenge','breite','tiefe','menge','abschnitt','bemerkung','formel_typ','formel','menge_hinten'}
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/position/<int:pos_id>/update-cell', methods=['POST'])
@login_required
def position_update_cell(project_id, aufmass_id, pos_id):
pos = Position.query.get_or_404(pos_id)
if pos.project.company_id != current_user.company_id:
return jsonify({'error': 'Zugriff verweigert'}), 403
if not _check_zugriff(pos.project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json() or {}
field = data.get('field')
value = data.get('value')
if field not in FIELDS:
return jsonify({'error': 'Ungültiges Feld'}), 400
old_val = getattr(pos, field)
setattr(pos, field, value)
if field in ('faktor','laenge','breite','tiefe','formel_typ','formel','einheit','menge','menge_hinten'):
pos.berechne_menge(recalc_hinten=(field != 'menge_hinten'), skip_menge_recalc=(field == 'menge'))
# Create history entry for inline cell edit
field_labels = {
'kurztext': 'Kurztext', 'formel_typ': 'Z-Art', 'einheit': 'EH',
'einzelpreis': 'EP', 'faktor': 'Faktor', 'laenge': 'Länge', 'breite': 'Breite',
'tiefe': 'Tiefe', 'abschnitt': 'Abschnitt', 'bemerkung': 'Bemerkung',
'menge_hinten': 'Menge hinten', 'menge': 'Menge', 'formel': 'Formel'
}
old_str = str(old_val) if old_val is not None else 'leer'
new_str = str(value) if value is not None else 'leer'
if old_str != new_str:
desc = f"✎ Pos {pos.pos_nr or pos.id}: {field_labels.get(field, field)}: {old_str}{new_str}"
hist = AufmassHistory(
aufmass_id=aufmass_id,
position_id=pos.id,
changed_by=current_user.id,
action='change',
description=desc,
diff=json.dumps([{
'position_id': pos.id,
'pos_nr': pos.pos_nr or '',
'kurztext': pos.kurztext or '',
'row_idx': pos.sortierung - 1,
'old': {field: old_val},
'new': {field: value}
}])
)
db.session.add(hist)
db.session.commit()
resp = {'status':'ok','menge':pos.menge,'gesamtpreis':pos.gesamtpreis,'menge_hinten':pos.menge_hinten}
if field == 'formel_typ':
resp['formel_typ_changed'] = True
return jsonify(resp)
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/position/<int:pos_id>/loeschen', methods=['POST'])
@login_required
def position_loeschen(project_id, aufmass_id, pos_id):
pos = Position.query.get_or_404(pos_id)
if pos.project.company_id != current_user.company_id:
return 'Zugriff verweigert', 403
if not _check_zugriff(pos.project, 'schreiben'):
return 'Zugriff verweigert', 403
err = _lock_error(aufmass_id)
if err:
return err
db.session.delete(pos)
db.session.commit()
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id))
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/leeren', methods=['POST'])
@login_required
def positionen_leeren(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
positions = Position.query.filter_by(aufmass_id=aufmass_id).all()
changes = []
for p in positions:
changes.append({
'position_id': p.id,
'pos_nr': p.pos_nr or '',
'old': {
'id': p.id, 'lv_pos_id': p.lv_position_id, 'pos_nr': p.pos_nr or '',
'kurztext': p.kurztext or '', 'faktor': p.faktor, 'laenge': p.laenge,
'breite': p.breite, 'tiefe': p.tiefe, 'menge': p.menge,
'einheit': p.einheit or '', 'formel_typ': p.formel_typ or 'standard',
'formel': p.formel or '', 'abschnitt': p.abschnitt or '',
'bemerkung': p.bemerkung or '', 'sort_order': p.sortierung
},
'new': {'_action': 'delete', 'id': p.id}
})
Position.query.filter_by(aufmass_id=aufmass_id).delete()
if changes:
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=f'Alle {len(changes)} Position(en) gelöscht',
diff=json.dumps(changes)
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok'})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/batch-loeschen', methods=['POST'])
@login_required
def positionen_batch_loeschen(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json() or {}
ids = data.get('ids', [])
if not ids:
return jsonify({'error': 'Keine IDs'}), 400
positions = Position.query.filter(Position.id.in_(ids), Position.aufmass_id == aufmass_id).all()
changes = []
for p in positions:
changes.append({
'position_id': p.id,
'pos_nr': p.pos_nr or '',
'old': {
'id': p.id, 'lv_pos_id': p.lv_position_id, 'pos_nr': p.pos_nr or '',
'kurztext': p.kurztext or '', 'faktor': p.faktor, 'laenge': p.laenge,
'breite': p.breite, 'tiefe': p.tiefe, 'menge': p.menge,
'einheit': p.einheit or '', 'formel_typ': p.formel_typ or 'standard',
'formel': p.formel or '', 'abschnitt': p.abschnitt or '',
'bemerkung': p.bemerkung or '', 'sort_order': p.sortierung
},
'new': {'_action': 'delete', 'id': p.id}
})
Position.query.filter(Position.id.in_(ids), Position.aufmass_id == aufmass_id).delete(synchronize_session=False)
if changes:
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=f'{len(changes)} Position(en) gelöscht',
diff=json.dumps(changes)
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok'})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/kopieren', methods=['POST'])
@login_required
def positionen_kopieren(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
data = request.get_json() or {}
ids = data.get('ids', [])
insert_idx = data.get('insert_idx')
if not ids:
return jsonify({'error': 'Keine IDs'}), 400
originals = Position.query.filter(Position.id.in_(ids), Position.aufmass_id == aufmass_id).order_by(Position.sortierung).all()
if not originals:
return jsonify({'error': 'Positionen nicht gefunden'}), 404
existing = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
if insert_idx is not None and insert_idx >= 0:
if insert_idx > len(existing): insert_idx = len(existing)
sort_base = insert_idx + 1
for i, p in enumerate(existing):
new_sort = i + 1
if new_sort >= sort_base:
p.sortierung = new_sort + len(originals)
db.session.flush()
else:
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(aufmass_id=aufmass_id).scalar() or 0
sort_base = max_sort + 1
added = []
added_details = []
for orig in originals:
copy = Position(
project_id=project_id, aufmass_id=aufmass_id,
lv_position_id=orig.lv_position_id,
pos_nr=orig.pos_nr, sortierung=sort_base,
rsa=orig.rsa or '', abschnitt=orig.abschnitt or '',
kurztext=orig.kurztext or '', langtext=orig.langtext or '',
einheit=orig.einheit or '', einzelpreis=orig.einzelpreis or 0,
faktor=orig.faktor, laenge=orig.laenge, breite=orig.breite, tiefe=orig.tiefe,
menge=orig.menge, gesamtpreis=orig.gesamtpreis,
formel_typ=orig.formel_typ or 'standard', formel=orig.formel or '',
bemerkung=orig.bemerkung or '',
menge_hinten=orig.menge_hinten or 0,
)
db.session.add(copy)
db.session.flush()
added.append(copy.id)
added_details.append({'old_id': orig.id, 'new_id': copy.id, 'pos_nr': copy.pos_nr})
sort_base += 1
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=f'{len(added)} Position(en) kopiert',
diff=json.dumps([{
'position_id': d['new_id'],
'pos_nr': d['pos_nr'],
'old': {'id': d['old_id']},
'new': {'_action': 'copy', 'old_id': d['old_id'], 'new_id': d['new_id']}
} for d in added_details])
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok', 'added': added})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/reihenfolge', methods=['POST'])
@login_required
def positionen_reihenfolge(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
data = request.get_json()
if not data or 'reihenfolge' not in data:
return jsonify({'error': 'Keine Daten'}), 400
for item in data['reihenfolge']:
pos = Position.query.get(item['id'])
if pos and pos.aufmass_id == aufmass_id:
pos.sortierung = item['sortierung']
db.session.commit()
return jsonify({'status': 'ok'})
# === Projekt-Level Operationen (kein spezifisches Aufmaß) ===
@aufmass_bp.route('/<int:project_id>/status', methods=['POST'])
@login_required
def status_aendern(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return 'Zugriff verweigert', 403
project.status = request.form.get('status', project.status)
db.session.commit()
flash(f'Status geändert auf {project.status}.', 'success')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@aufmass_bp.route('/<int:project_id>/loeschen', methods=['POST'])
@login_required
def project_loeschen(project_id):
project = Project.query.get_or_404(project_id)
if project.company_id != current_user.company_id:
return 'Zugriff verweigert', 403
if not _check_zugriff(project, 'schreiben'):
return 'Zugriff verweigert', 403
db.session.delete(project)
db.session.commit()
flash('Projekt gelöscht.', 'success')
return redirect(url_for('aufmass.index'))
@aufmass_bp.route('/<int:project_id>/lv/search')
@login_required
def lv_search(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return jsonify([]), 403
q = request.args.get('q', '').strip()
base = LVPosition.query.filter_by(company_id=current_user.company_id, lv_name=project.lv_name)
if q:
like = f'%{q}%'
base = base.filter(db.or_(LVPosition.pos_nr.like(like), LVPosition.kurztext.like(like), LVPosition.langtext.like(like)))
pos = base.order_by(LVPosition.order_index).limit(200).all()
return jsonify([{
'id': p.id, 'pos_nr': p.pos_nr, 'kurztext': p.kurztext,
'langtext': p.langtext, 'einheit': p.einheit, 'einzelpreis': p.einzelpreis,
'rsa': p.rsa or '', 'abschnitt': p.abschnitt or '',
} for p in pos])
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/batch-add', methods=['POST'])
@login_required
def positionen_batch_add(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json() or {}
ids = data.get('ids', [])
if not ids:
return jsonify({'error': 'Keine IDs'}), 400
insert_idx = data.get('insert_idx')
if insert_idx is not None:
insert_idx = int(insert_idx)
existing = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
if insert_idx < 0: insert_idx = 0
if insert_idx > len(existing): insert_idx = len(existing)
sort_base = insert_idx + 1
for i, p in enumerate(existing):
new_sort = i + 1
if new_sort >= sort_base:
new_sort += len(ids)
p.sortierung = new_sort
db.session.flush()
else:
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(aufmass_id=aufmass_id).scalar() or 0
sort_base = max_sort + 1
overrides = data.get('overrides', {})
added_objs = []
added = []
for lv_id in ids:
lv_pos = LVPosition.query.get(int(lv_id))
if not lv_pos or lv_pos.company_id != current_user.company_id:
continue
pos = Position(
project_id=project_id, aufmass_id=aufmass_id,
lv_position_id=lv_pos.id,
pos_nr=overrides.get('pos_nr', lv_pos.pos_nr),
sortierung=sort_base,
rsa=lv_pos.rsa or '', abschnitt=overrides.get('abschnitt', lv_pos.abschnitt or ''),
kurztext=overrides.get('kurztext', lv_pos.kurztext or ''),
langtext=lv_pos.langtext or '',
einheit=lv_pos.einheit or '',
einzelpreis=lv_pos.einzelpreis or 0,
faktor=overrides.get('faktor', 1.0),
laenge=overrides.get('laenge', 0),
breite=overrides.get('breite', 0),
tiefe=overrides.get('tiefe', 0),
menge=overrides.get('menge', 0),
gesamtpreis=overrides.get('gesamtpreis', 0),
formel_typ=overrides.get('formel_typ', 'standard'),
formel=overrides.get('formel', ''),
bemerkung=overrides.get('bemerkung', ''),
menge_hinten=overrides.get('menge_hinten', 0),
)
if overrides:
pos.berechne_menge()
else:
pos.menge_hinten = pos.faktor * pos.menge
pos.gesamtpreis = pos.menge_hinten * pos.einzelpreis
db.session.add(pos)
added_objs.append(pos)
added.append({'id': pos.id, 'pos_nr': pos.pos_nr, 'kurztext': pos.kurztext})
sort_base += 1
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=f'{len(added_objs)} Position(en) aus LV hinzugefügt',
diff=json.dumps([{
'position_id': p.id,
'pos_nr': p.pos_nr,
'kurztext': p.kurztext or '',
'row_idx': added_objs.index(p),
'old': {'id': None},
'new': {'_action': 'add', 'id': p.id}
} for p in added_objs])
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok', 'added': added})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/batch-edit', methods=['POST'])
@login_required
def positionen_batch_edit(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json(force=True, silent=True)
if not data:
return jsonify({'error': 'Keine Daten (data is None)', 'raw': request.data[:200].decode('utf-8', errors='replace')}), 400
if 'positions' not in data:
return jsonify({'error': 'Keine positions in data', 'keys': list(data.keys())}), 400
FIELDS = {'kurztext','formel_typ','einheit','einzelpreis','faktor','laenge','breite','tiefe','abschnitt','bemerkung','menge_hinten','menge','formel'}
changes = []
for pos_id, updates in data['positions'].items():
pos = Position.query.get(int(pos_id))
if not pos or pos.aufmass_id != aufmass_id or pos.project.company_id != current_user.company_id:
continue
old_values = {}
for field, value in updates.items():
if field not in FIELDS:
continue
old_values[field] = getattr(pos, field)
setattr(pos, field, value)
if old_values:
row_idx = pos.sortierung - 1
changes.append({
'position_id': pos.id,
'pos_nr': pos.pos_nr,
'kurztext': pos.kurztext or '',
'row_idx': row_idx,
'old': old_values,
'new': updates
})
ft = updates.get('formel_typ') or pos.formel_typ or 'standard'
skip_me = ('menge' in updates) and ft != 'frei'
pos.berechne_menge(recalc_hinten=('menge_hinten' not in updates), skip_menge_recalc=skip_me)
if changes:
field_labels = {
'kurztext': 'Kurztext', 'formel_typ': 'Z-Art', 'einheit': 'EH',
'einzelpreis': 'EP', 'faktor': 'Faktor', 'laenge': 'Länge', 'breite': 'Breite',
'tiefe': 'Tiefe', 'abschnitt': 'Abschnitt', 'bemerkung': 'Bemerkung',
'menge_hinten': 'Menge hinten', 'menge': 'Menge', 'formel': 'Formel'
}
desc_parts = []
long_parts = []
for c in changes:
row_label = f"Zeile {c['row_idx']+1}" if c.get('row_idx') is not None else "?"
pos_label = f"{row_label}: Pos {c['pos_nr'] or '#'+str(c['position_id'])}"
kurztext = c.get('kurztext', '')[:20]
if kurztext:
kurztext = f' "{kurztext}"'
else:
kurztext = ''
changes_detail = []
for field in c['new']:
old_val = c['old'].get(field)
new_val = c['new'].get(field)
if old_val is None and new_val is None:
continue
old_str = str(old_val) if old_val is not None else 'leer'
new_str = str(new_val) if new_val is not None else 'leer'
if old_str != new_str:
label = field_labels.get(field, field)
changes_detail.append(f'{label}: {old_str}{new_str}')
if changes_detail:
short = f"{pos_label}: {', '.join(changes_detail[:2])}"
if len(changes_detail) > 2:
short += f' (+{len(changes_detail)-2})'
desc_parts.append(short)
long_parts.append(f"{pos_label}{kurztext}\n " + '\n '.join(changes_detail))
description = ', '.join(desc_parts[:3])
if len(desc_parts) > 3:
description += f' (+{len(desc_parts)-3} weitere)'
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=description,
diff=json.dumps(changes)
)
db.session.add(hist)
db.session.commit()
return jsonify({'ok': True})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/position/leer', methods=['POST'])
@login_required
def position_leer(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
data = request.get_json(silent=True) or {}
insert_idx = data.get('insert_idx')
if insert_idx is not None:
insert_idx = int(insert_idx)
existing = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
if insert_idx < 0: insert_idx = 0
if insert_idx > len(existing): insert_idx = len(existing)
sort_base = insert_idx + 1
for i, p in enumerate(existing):
new_sort = i + 1
if new_sort >= sort_base:
p.sortierung = new_sort + 1
db.session.flush()
else:
max_sort = db.session.query(db.func.max(Position.sortierung)).filter_by(aufmass_id=aufmass_id).scalar() or 0
sort_base = max_sort + 1
pos = Position(
project_id=project_id, aufmass_id=aufmass_id,
pos_nr='', sortierung=sort_base,
einheit='', faktor=0, laenge=0, breite=0, tiefe=0,
menge=0, einzelpreis=0, gesamtpreis=0,
formel_typ='standard', menge_hinten=0,
)
db.session.add(pos)
db.session.flush()
hist = AufmassHistory(
aufmass_id=aufmass_id,
position_id=pos.id,
changed_by=current_user.id,
action='change',
description=f'Leere Zeile hinzugefügt (Pos #{pos.id})',
diff=json.dumps([{
'position_id': pos.id,
'pos_nr': '',
'old': {'id': None},
'new': {'_action': 'add_empty', 'id': pos.id}
}])
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok', 'id': pos.id, 'pos_nr': pos.pos_nr})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen')
@login_required
def positionen_liste(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return jsonify([]), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify([]), 404
pos = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
return jsonify([{
'id': p.id, 'pos_nr': p.pos_nr, 'sortierung': p.sortierung,
'rsa': p.rsa or '', 'abschnitt': p.abschnitt or '',
'kurztext': p.kurztext, 'langtext': p.langtext,
'einheit': p.einheit, 'einzelpreis': p.einzelpreis,
'menge': p.menge, 'gesamtpreis': p.gesamtpreis,
'faktor': p.faktor, 'laenge': p.laenge, 'breite': p.breite, 'tiefe': p.tiefe,
'bemerkung': p.bemerkung or '',
'formel_typ': p.formel_typ or 'standard', 'formel': p.formel or '',
'menge_hinten': p.menge_hinten or 0,
} for p in pos])
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/farben')
@login_required
def positionen_farben(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return jsonify({}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({}), 404
pos = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
lv_pos_nrs = set()
if project.lv_name:
lvps = LVPosition.query.filter_by(company_id=current_user.company_id, lv_name=project.lv_name).all()
lv_pos_nrs = {lp.pos_nr for lp in lvps}
result = {}
for p in pos:
color_map = {}
is_empty = not p.pos_nr and p.einheit in ('', None)
if is_empty:
color_map['all'] = '#F7CA14'
else:
e = p.einheit or ''
m = p.menge or 0
f = p.faktor or 0
l = p.laenge or 0
b = p.breite or 0
t = p.tiefe or 0
if e in ('ST', 'LE', 'STD', 'h', 'Psch') and m == 0:
color_map['menge'] = '#ED686B'
if e == 'M' and l == 0:
color_map['laenge'] = '#ED686B'
if e == 'M2':
if l == 0: color_map['laenge'] = '#ED686B'
if b == 0: color_map['breite'] = '#ED686B'
if e in ('M3', 't'):
if l == 0: color_map['laenge'] = '#ED686B'
if b == 0: color_map['breite'] = '#ED686B'
if t == 0: color_map['tiefe'] = '#ED686B'
if p.pos_nr and p.pos_nr not in lv_pos_nrs:
color_map['pos_nr'] = '#3370AD'
result[str(p.id)] = color_map
return jsonify(result)
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/positionen/batch-reorder', methods=['POST'])
@login_required
def positionen_batch_reorder(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
err = _lock_error(aufmass_id)
if err:
return err
try:
data = request.get_json() or {}
except Exception:
return jsonify({'error': 'Invalid JSON'}), 400
ids = data.get('ids', [])
if not ids:
return jsonify({'error': 'Keine IDs'}), 400
try:
ids = [int(i) for i in ids]
except (ValueError, TypeError):
return jsonify({'error': 'Invalid IDs'}), 400
insert_idx = data.get('insert_idx')
existing = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
if not existing:
return jsonify({'status': 'ok', 'changes': []})
moved_set = set(ids)
remaining = [p for p in existing if p.id not in moved_set]
if insert_idx is None:
new_order = remaining + [p for p in existing if p.id in moved_set]
elif insert_idx < 0 or insert_idx >= len(remaining) + len(moved_set):
new_order = remaining + [p for p in existing if p.id in moved_set]
else:
new_order = remaining[:insert_idx] + [p for p in existing if p.id in moved_set] + remaining[insert_idx:]
changes = []
pos_list_for_desc = []
for i, p in enumerate(new_order):
old_idx = p.sortierung
new_idx = i + 1
if old_idx != new_idx:
p.sortierung = new_idx
changes.append({
'position_id': p.id,
'pos_nr': p.pos_nr or '',
'row_idx': i,
'old_idx': old_idx,
'new_idx': new_idx,
'kurztext': p.kurztext or '',
'old': {'sort_order': old_idx},
'new': {'_action': 'reorder', 'sort_order': new_idx, 'old_idx': old_idx}
})
pos_list_for_desc.append({
'position_id': p.id,
'pos_nr': p.pos_nr or '',
'kurztext': p.kurztext or '',
'row_idx': i,
'old_idx': old_idx,
'new_idx': new_idx
})
else:
p.sortierung = new_idx
if changes:
try:
short_desc, long_desc = _build_history_description('move', pos_list_for_desc, current_user.name or current_user.email)
if not short_desc:
short_desc = f'{len(changes)} Position(en) verschoben'
except Exception:
short_desc = f'{len(changes)} Position(en) verschoben'
hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='change',
description=short_desc,
diff=json.dumps(changes)
)
db.session.add(hist)
db.session.commit()
return jsonify({'status': 'ok', 'changes': changes})
@aufmass_bp.route('/<int:project_id>/lv-set', methods=['POST'])
@login_required
def project_lv_set(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
project.lv_name = request.form.get('lv_name', '').strip()
contract_id = request.form.get('contract_id', type=int)
if contract_id:
project.contract_id = contract_id
contract = Contract.query.get(contract_id)
if contract:
project.vertrag = contract.name
db.session.commit()
first_aufmass = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if first_aufmass:
return redirect(url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=first_aufmass.id))
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@aufmass_bp.route('/<int:project_id>/kopfdaten', methods=['POST'])
@login_required
def project_kopfdaten_save(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
project.vertrag = request.form.get('vertrag', '').strip()
project.lv_name = request.form.get('lv_name', '').strip()
project.bezeichnung = request.form.get('bezeichnung', '').strip()
project.baustelle = request.form.get('baustelle', '').strip()
project.bauabschnitt = request.form.get('bauabschnitt', '').strip()
project.sm_nr = request.form.get('sm_nr', '').strip()
project.abruf_nr = request.form.get('abruf_nr', '').strip()
project.datum_start = _parse_date(request.form.get('datum_start'))
project.datum_ende = _parse_date(request.form.get('datum_ende'))
project.ansprechpartner_vorname = request.form.get('ansprechpartner_vorname', '').strip()
project.ansprechpartner_nachname = request.form.get('ansprechpartner_nachname', '').strip()
project.ansprechpartner_tel = request.form.get('ansprechpartner_tel', '').strip()
project.ansprechpartner_email = request.form.get('ansprechpartner_email', '').strip()
aufmass_id = request.form.get('aufmass_id', type=int)
if aufmass_id:
aufmass = Aufmass.query.get(aufmass_id)
if aufmass and aufmass.project_id == project_id:
aufmass.typ = request.form.get('typ', '').strip()
aufmass.datum = _parse_date(request.form.get('datum'))
if request.form.get('datum'):
project.datum = _parse_date(request.form.get('datum'))
db.session.commit()
flash('Kopfdaten gespeichert.', 'success')
return redirect(request.referrer or url_for('aufmass.bearbeiten', project_id=project_id, aufmass_id=aufmass_id or 0))
@aufmass_bp.route('/<int:project_id>/update-name', methods=['POST'])
@login_required
def project_update_name(project_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
name = request.form.get('name', '').strip()
if not name:
return jsonify({'error': 'Name darf nicht leer sein'}), 400
import re
invalid_chars = r'[<>:"/\\|?*&#%{}~\[\]]'
if re.search(invalid_chars, name):
return jsonify({'error': 'Name enthält ungültige Zeichen'}), 400
project.bezeichnung = name
db.session.commit()
return jsonify({'name': name})
@aufmass_bp.route('/module-reorder', methods=['POST'])
@login_required
def module_reorder():
data = request.get_json(silent=True)
if not data or 'order' not in data:
return jsonify({'error': 'Keine Daten'}), 400
from app.models.module import Module
from app.models.custom_module import CustomModule
for i, item in enumerate(data['order']):
if item.get('type') == 'module':
mod = Module.query.get(int(item['id']))
if mod:
mod.sortierung = i
elif item.get('type') == 'custom':
cm = CustomModule.query.get(int(item['id']))
if cm and cm.company_id == current_user.company_id:
cm.sort_index = i
db.session.commit()
return jsonify({'ok': True})
@aufmass_bp.route('/module-toggle-hidden', methods=['POST'])
@login_required
def module_toggle_hidden():
data = request.get_json(silent=True)
if not data or 'type' not in data or 'id' not in data:
return jsonify({'error': 'Ungültige Daten'}), 400
hidden = current_user.get_hidden_modules()
entry = {'type': data['type'], 'id': data['id']}
found = False
for h in hidden:
if h['type'] == entry['type'] and h['id'] == entry['id']:
hidden.remove(h)
found = True
break
if not found:
hidden.append(entry)
current_user.set_hidden_modules(hidden)
db.session.commit()
return jsonify({'hidden': found, 'type': data['type'], 'id': data['id']})
@aufmass_bp.route('/module-toggle-hidden-batch', methods=['POST'])
@login_required
def module_toggle_hidden_batch():
data = request.get_json(silent=True)
if not data or 'entries' not in data or 'action' not in data:
return jsonify({'error': 'Ungültige Daten'}), 400
hidden = current_user.get_hidden_modules()
action = data['action'] # 'hide' or 'show'
for entry in data['entries']:
key = {'type': entry['type'], 'id': str(entry['id'])}
found = False
for h in hidden:
if h['type'] == key['type'] and h['id'] == key['id']:
if action == 'show':
hidden.remove(h)
found = True
break
if not found and action == 'hide':
hidden.append(key)
current_user.set_hidden_modules(hidden)
db.session.commit()
return jsonify({'ok': True, 'action': action, 'count': len(data['entries'])})
def _parse_date(s):
if not s:
return None
for fmt in ('%Y-%m-%d', '%d.%m.%Y', '%Y.%m.%d'):
try:
return datetime.strptime(s.strip(), fmt).date()
except ValueError:
continue
return None
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/undo', methods=['POST'])
@login_required
def undo(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
last_change = (AufmassHistory.query
.filter_by(aufmass_id=aufmass_id, action='change')
.order_by(AufmassHistory.id.desc())
.first())
if not last_change:
return jsonify({'error': 'Nichts zum Rückgängig machen'}), 400
changes = json.loads(last_change.diff)
for change in changes:
pos = Position.query.get(change['position_id'])
if not pos:
continue
new_data = change.get('new', {})
old_data = change.get('old', {})
if new_data.get('_action') in ('add', 'add_empty', 'add_form'):
db.session.delete(pos)
elif new_data.get('_action') == 'delete':
pos_id = change.get('position_id')
pos_deleted = Position.query.get(pos_id)
if not pos_deleted:
lv_id = old_data.get('lv_pos_id') if old_data.get('lv_pos_id') else None
pos_deleted = Position(
aufmass_id=aufmass_id,
lv_pos_id=lv_id,
pos_nr=old_data.get('pos_nr', ''),
kurztext=old_data.get('kurztext', ''),
faktor=old_data.get('faktor', 1.0),
laenge=old_data.get('laenge', 0),
breite=old_data.get('breite', 0),
tiefe=old_data.get('tiefe', 0),
menge=old_data.get('menge', 0),
einheit=old_data.get('einheit', ''),
formel_typ=old_data.get('formel_typ', 'standard'),
formel=old_data.get('formel', ''),
abschnitt=old_data.get('abschnitt', ''),
bemerkung=old_data.get('bemerkung', ''),
sort_order=old_data.get('sort_order', 0)
)
db.session.add(pos_deleted)
else:
for field, val in old_data.items():
if field not in ('_action', 'id'):
setattr(pos_deleted, field, val)
db.session.add(pos_deleted)
elif new_data.get('_action') == 'copy':
new_pos = Position.query.get(new_data.get('new_id'))
if new_pos:
db.session.delete(new_pos)
elif new_data.get('_action') == 'reorder':
# For reorder undo, we need to restore the old order of ALL positions
# The old order is stored in row_idx and order in new_data
old_order = new_data.get('order', [])
if old_order:
# Get all positions for this aufmass and reorder them
all_pos = Position.query.filter_by(aufmass_id=aufmass_id).order_by(Position.sortierung).all()
for i, pos_id in enumerate(old_order):
pos = Position.query.get(pos_id)
if pos:
pos.sortierung = i + 1
else:
# Fallback: just restore sort_order from old data
old_sort = old_data.get('sort_order')
if old_sort:
pos.sortierung = old_sort
else:
for field, old_val in old_data.items():
if field != '_action':
setattr(pos, field, old_val)
ft = pos.formel_typ or 'standard'
skip_me = ('menge' in old_data) and ft != 'frei'
pos.berechne_menge(recalc_hinten=('menge_hinten' not in old_data), skip_menge_recalc=skip_me)
undo_hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='undo',
description=f"Rückgängig: {last_change.description}",
diff=json.dumps({'undo_of': last_change.id})
)
db.session.add(undo_hist)
db.session.commit()
return jsonify({'ok': True, 'message': f'Rückgängig: {last_change.description[:50]}'})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/redo', methods=['POST'])
@login_required
def redo(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
last_undo = (AufmassHistory.query
.filter_by(aufmass_id=aufmass_id, action='undo')
.order_by(AufmassHistory.id.desc())
.first())
if not last_undo:
return jsonify({'error': 'Nichts zum Wiederholen'}), 400
changes = json.loads(last_undo.diff)
if 'undo_of' not in changes:
return jsonify({'error': 'Ungültiger Undo-Eintrag'}), 400
original = AufmassHistory.query.get(changes['undo_of'])
if not original:
return jsonify({'error': 'Original nicht gefunden'}), 400
original_changes = json.loads(original.diff)
for change in original_changes:
pos = Position.query.get(change['position_id'])
new_data = change.get('new', {})
old_data = change.get('old', {})
if new_data.get('_action') in ('add', 'add_empty', 'add_form'):
# Redo add: Position wiederherstellen
if not pos:
pos = Position(
aufmass_id=aufmass_id,
lv_position_id=old_data.get('lv_pos_id'),
pos_nr=old_data.get('pos_nr', ''),
kurztext=old_data.get('kurztext', ''),
faktor=old_data.get('faktor', 1.0),
laenge=old_data.get('laenge', 0),
breite=old_data.get('breite', 0),
tiefe=old_data.get('tiefe', 0),
menge=old_data.get('menge', 0),
einheit=old_data.get('einheit', ''),
formel_typ=old_data.get('formel_typ', 'standard'),
formel=old_data.get('formel', ''),
abschnitt=old_data.get('abschnitt', ''),
bemerkung=old_data.get('bemerkung', ''),
sort_order=old_data.get('sort_order', 0)
)
db.session.add(pos)
db.session.flush()
elif new_data.get('_action') == 'delete':
# Redo delete: Position löschen
if pos:
db.session.delete(pos)
elif new_data.get('_action') == 'copy':
# Redo copy: Kopie wiederherstellen
old_id = new_data.get('old_id')
orig = Position.query.get(old_id)
if orig:
copy = Position(
project_id=project_id, aufmass_id=aufmass_id,
lv_position_id=orig.lv_position_id,
pos_nr=orig.pos_nr,
sortierung=pos.sortierung if pos else old_data.get('sort_order', 0) + 1,
rsa=orig.rsa or '', abschnitt=orig.abschnitt or '',
kurztext=orig.kurztext or '', langtext=orig.langtext or '',
einheit=orig.einheit or '', einzelpreis=orig.einzelpreis or 0,
faktor=orig.faktor, laenge=orig.laenge, breite=orig.breite, tiefe=orig.tiefe,
menge=orig.menge, gesamtpreis=orig.gesamtpreis,
formel_typ=orig.formel_typ or 'standard', formel=orig.formel or '',
bemerkung=orig.bemerkung or '',
menge_hinten=orig.menge_hinten or 0,
)
db.session.add(copy)
db.session.flush()
# Update the original position reference
if pos:
pos.sortierung = new_data.get('new_id')
elif new_data.get('_action') == 'reorder':
# Redo reorder: Neue Reihenfolge wiederherstellen
new_order = new_data.get('order', [])
if new_order:
for i, pos_id in enumerate(new_order):
p = Position.query.get(pos_id)
if p:
p.sortierung = i + 1
else:
# Normal edit: new values wiederherstellen
if pos:
for field, new_val in new_data.items():
if field != '_action':
setattr(pos, field, new_val)
ft = pos.formel_typ or 'standard'
skip_me = ('menge' in new_data) and ft != 'frei'
pos.berechne_menge(recalc_hinten=('menge_hinten' not in new_data), skip_menge_recalc=skip_me)
redo_hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='redo',
description=f"Wiederholt: {original.description}",
diff=json.dumps({'redo_of': original.id})
)
db.session.add(redo_hist)
db.session.commit()
return jsonify({'ok': True, 'message': f'Wiederholt: {original.description[:50]}'})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/history', methods=['GET'])
@login_required
def history(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
limit = request.args.get('limit', 10, type=int)
hist_list = (AufmassHistory.query
.filter_by(aufmass_id=aufmass_id)
.order_by(AufmassHistory.changed_at.desc())
.limit(limit)
.all())
data = []
for h in hist_list:
user = User.query.get(h.changed_by)
user_name = f"{user.vorname} {user.nachname}".strip() if user else 'Unbekannt'
try:
changes = json.loads(h.diff) if h.diff else []
except Exception as e:
changes = []
# Build long description
long_desc = h.description or ''
if h.action == 'undo' and changes.get('undo_of'):
# For undo, get original action description and build long version
orig = AufmassHistory.query.get(changes['undo_of'])
if orig:
try:
orig_changes = json.loads(orig.diff) if orig.diff else []
long_desc = _build_long_description(orig_changes, user_name, h.changed_at, prefix='↩ Rückgängig: ')
except:
long_desc = f"↩ Rückgängig: {orig.description}"
elif h.action == 'redo' and changes.get('undo_of'):
# For redo, the diff contains undo_of pointing to the undo entry
# We need to get the original change from that undo entry
undo_entry = AufmassHistory.query.get(changes['undo_of'])
if undo_entry and undo_entry.action == 'undo':
undo_diff = json.loads(undo_entry.diff) if undo_entry.diff else {}
if undo_diff.get('undo_of'):
orig = AufmassHistory.query.get(undo_diff['undo_of'])
if orig:
try:
orig_changes = json.loads(orig.diff) if orig.diff else []
long_desc = _build_long_description(orig_changes, user_name, h.changed_at, prefix='↪ Wiederholt: ')
except:
long_desc = f"↪ Wiederholt: {orig.description}"
elif changes:
# Normal case: build from changes
try:
long_desc = _build_long_description(changes, user_name, h.changed_at)
except Exception as e:
long_desc = h.description or ''
data.append({
'id': h.id,
'user': user.email if user else 'unbekannt',
'user_name': user_name,
'time': h.changed_at.strftime('%d.%m.%Y %H:%M:%S'),
'time_iso': h.changed_at.isoformat(),
'action': h.action,
'description': h.description or '',
'long_description': long_desc,
'diff': changes
})
return jsonify(data)
def _build_long_description(changes, user_name, dt, prefix=''):
"""Baut die lange History-Beschreibung."""
if not changes:
return ''
from datetime import datetime
now = dt.strftime('%d.%m.%Y %H:%M') if dt else datetime.now().strftime('%d.%m.%Y %H:%M')
lines = [f"[{now}] {user_name}"]
field_labels = {
'faktor': 'Faktor', 'laenge': 'Länge', 'breite': 'Breite', 'tiefe': 'Tiefe',
'menge': 'Menge', 'menge_hinten': 'Menge hinten', 'einheit': 'EH',
'abschnitt': 'Abschnitt', 'bemerkung': 'Bemerkung', 'formel': 'Formel',
'formel_typ': 'Z-Art', 'kurztext': 'Kurztext', 'pos_nr': 'Pos-Nr'
}
for c in changes:
pos_nr = c.get('pos_nr', c.get('position_id', '?'))
kurztext = c.get('kurztext', '')
row_idx = c.get('row_idx')
line_num = f"Zeile {row_idx + 1}" if row_idx is not None else "?"
kt = f' "{kurztext}"' if kurztext else ''
old_data = c.get('old', {})
new_data = c.get('new', {})
action = new_data.get('_action', 'edit') if new_data else 'edit'
if action in ('add', 'add_empty', 'add_form'):
lines.append(f"+ {line_num}: Pos {pos_nr}{kt} - HINZUGEFÜGT")
elif action == 'delete':
lines.append(f"- {line_num}: Pos {pos_nr}{kt} - GELÖSCHT")
if old_data:
details = []
for k, v in old_data.items():
if k not in ('_action', 'id') and v:
lbl = field_labels.get(k, k)
details.append(f" {lbl}: {v}")
if details:
lines.extend(details[:5])
elif action == 'copy':
lines.append(f"{line_num}: Pos {pos_nr}{kt} - KOPIERT")
elif action == 'reorder':
old_idx = old_data.get('sort_order', '?')
new_idx = new_data.get('sort_order', '?')
if old_idx != new_idx:
direction = '' if new_idx > old_idx else ''
lines.append(f"{line_num}: Pos {pos_nr}{kt} - {old_idx} {direction} {new_idx}")
else:
# Edit
diffs = []
for field in set(list(old_data.keys()) + list(new_data.keys())):
if field.startswith('_') or field == 'id':
continue
old_v = old_data.get(field)
new_v = new_data.get(field)
if old_v is None and new_v is None:
continue
old_s = str(old_v) if old_v is not None else 'leer'
new_s = str(new_v) if new_v is not None else 'leer'
if old_s != new_s:
lbl = field_labels.get(field, field)
diffs.append(f" {lbl}: {old_s}{new_s}")
if diffs:
lines.append(f"{line_num}: Pos {pos_nr}{kt}")
lines.extend(diffs[:4])
if len(diffs) > 4:
lines.append(f" ... (+{len(diffs)-4} weitere)")
if prefix and lines:
lines[0] = prefix + lines[0]
return '\n'.join(lines)
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/history/count', methods=['GET'])
@login_required
def history_count(project_id, aufmass_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'lesen'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
change_count = AufmassHistory.query.filter_by(aufmass_id=aufmass_id, action='change').count()
undo_count = AufmassHistory.query.filter_by(aufmass_id=aufmass_id, action='undo').count()
return jsonify({'changes': change_count, 'undo_count': undo_count, 'can_undo': change_count > undo_count})
@aufmass_bp.route('/<int:project_id>/<int:aufmass_id>/history/<int:history_id>/undo', methods=['POST'])
@login_required
def history_undo_specific(project_id, aufmass_id, history_id):
project = Project.query.get_or_404(project_id)
if not _check_zugriff(project, 'schreiben'):
return jsonify({'error': 'Zugriff verweigert'}), 403
a = _get_aufmass_or_404(aufmass_id, project_id)
if not a:
return jsonify({'error': 'Aufmaß nicht gefunden'}), 404
h = AufmassHistory.query.get_or_404(history_id)
if h.aufmass_id != aufmass_id:
return jsonify({'error': 'History-Eintrag gehört nicht zu diesem Aufmaß'}), 400
if h.action != 'change':
return jsonify({'error': 'Nur Änderungen können rückgängig gemacht werden'}), 400
try:
changes = json.loads(h.diff) if h.diff else []
except:
return jsonify({'error': 'Ungültige History-Daten'}), 400
ft_cache = {}
for c in changes:
pos_id = c.get('position_id')
if not pos_id:
continue
pos = Position.query.get(pos_id)
if not pos:
continue
old_data = c.get('old', {})
new_data = c.get('new', {})
action = new_data.get('_action', 'edit') if new_data else 'edit'
if action in ('add', 'add_empty', 'add_form'):
# Delete the position
Position.query.filter_by(id=pos_id).delete()
elif action == 'delete':
# Restore the position from old_data
rest_pos = Position(
aufmass_id=aufmass_id,
lv_position_id=old_data.get('lv_pos_id'),
pos_nr=old_data.get('pos_nr', ''),
kurztext=old_data.get('kurztext', ''),
faktor=old_data.get('faktor', 1.0),
laenge=old_data.get('laenge', 0),
breite=old_data.get('breite', 0),
tiefe=old_data.get('tiefe', 0),
menge=old_data.get('menge', 0),
einheit=old_data.get('einheit', ''),
formel_typ=old_data.get('formel_typ', 'standard'),
formel=old_data.get('formel', ''),
abschnitt=old_data.get('abschnitt', ''),
bemerkung=old_data.get('bemerkung', ''),
sort_order=old_data.get('sort_order', 0)
)
db.session.add(rest_pos)
elif action == 'copy':
# Delete the copied position
Position.query.filter_by(id=pos_id).delete()
elif action == 'reorder':
# Restore old sort order
old_order = old_data.get('sort_order')
if old_order:
pos.sortierung = old_order
else:
# Edit - restore old values
for field, old_val in old_data.items():
if field != '_action' and field != 'id':
setattr(pos, field, old_val)
ft = pos.formel_typ or 'standard'
skip_me = ('menge' in old_data) and ft != 'frei'
pos.berechne_menge(recalc_hinten=('menge_hinten' not in old_data), skip_menge_recalc=skip_me)
# Create undo history entry
undo_hist = AufmassHistory(
aufmass_id=aufmass_id,
changed_by=current_user.id,
action='undo',
description=f"Rückgängig (aus History): {h.description[:50]}",
diff=json.dumps({'undo_of': h.id})
)
db.session.add(undo_hist)
db.session.commit()
return jsonify({'ok': True, 'message': f'Rückgängig gemacht: {h.description[:50]}'})