Initial commit – AufmaßCreater v2.35

This commit is contained in:
2026-06-10 11:03:43 +02:00
commit 84c933ea9c
2823 changed files with 490495 additions and 0 deletions
+433
View File
@@ -0,0 +1,433 @@
from flask import Blueprint, render_template, send_file, flash, redirect, url_for, request, jsonify, Response
from flask_login import login_required, current_user
from app.extensions import db
from app.models.project import Project
from app.models.position import Position
from app.services.export_service import export_project_to_excel
from app.services.export_pdf_service import export_project_to_pdf
from app.services.export_x31_service import export_to_x31, convert_to_california
import os, tempfile, io, zipfile, json
export_bp = Blueprint('export', __name__)
def _build_filename(project, suffix):
parts = [project.bezeichnung or '', project.baustelle or '', project.bauabschnitt or '', project.sm_nr or '', project.abruf_nr or '']
name = ' - '.join(p for p in parts if p)
return f'Aufmass {name}{suffix}'
def _get_aufmass_and_positions(project_id, aufmass_id=None, visible_ids=None):
project = Project.query.get_or_404(project_id)
if not current_user.hat_zugriff(project, 'lesen'):
return None, None, None
from app.models.aufmass import Aufmass
if aufmass_id:
aufmass = Aufmass.query.get_or_404(aufmass_id)
else:
aufmass = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if not aufmass:
return None, None, None
pos_query = Position.query.filter_by(aufmass_id=aufmass.id)
if visible_ids:
ids = [int(x) for x in visible_ids.split(',') if x.isdigit()]
if ids:
pos_query = pos_query.filter(Position.id.in_(ids))
positionen = pos_query.order_by(Position.sortierung).all()
return project, aufmass, positionen
@export_bp.route('/<int:project_id>/excel')
@login_required
def excel(project_id):
project = Project.query.get_or_404(project_id)
if not current_user.hat_zugriff(project, 'lesen'):
flash('Zugriff verweigert.', 'danger')
return redirect(url_for('aufmass.index'))
from app.models.aufmass import Aufmass
aufmass_id = request.args.get('aufmass_id', type=int)
if aufmass_id:
aufmass = Aufmass.query.get_or_404(aufmass_id)
else:
aufmass = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if not aufmass:
flash('Kein Aufmaß vorhanden.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
pos_query = Position.query.filter_by(aufmass_id=aufmass.id)
visible_ids = request.args.get('visible_ids')
if visible_ids:
ids = [int(x) for x in visible_ids.split(',') if x.isdigit()]
if ids:
pos_query = pos_query.filter(Position.id.in_(ids))
pos = pos_query.order_by(Position.sortierung).all()
fd, path = tempfile.mkstemp(suffix='.xlsx')
os.close(fd)
try:
company = current_user.company if not current_user.is_superadmin() else None
export_project_to_excel(project, aufmass, pos, path, company=company)
return send_file(path, as_attachment=True,
download_name=_build_filename(project, '.xlsx'))
except Exception as e:
flash(f'Fehler beim Export: {e}', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@export_bp.route('/<int:project_id>/pdf')
@login_required
def pdf(project_id):
project = Project.query.get_or_404(project_id)
if not current_user.hat_zugriff(project, 'lesen'):
flash('Zugriff verweigert.', 'danger')
return redirect(url_for('aufmass.index'))
from app.models.aufmass import Aufmass
aufmass_id = request.args.get('aufmass_id', type=int)
if aufmass_id:
aufmass = Aufmass.query.get_or_404(aufmass_id)
else:
aufmass = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if not aufmass:
flash('Kein Aufmaß vorhanden.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
pos_query = Position.query.filter_by(aufmass_id=aufmass.id)
visible_ids = request.args.get('visible_ids')
if visible_ids:
ids = [int(x) for x in visible_ids.split(',') if x.isdigit()]
if ids:
pos_query = pos_query.filter(Position.id.in_(ids))
positionen = pos_query.order_by(Position.sortierung).all()
company = current_user.company if not current_user.is_superadmin() else None
if request.args.get('html') == '1':
from app.services.export_pdf_service import _render_pdf_html
html = _render_pdf_html(project, aufmass, positionen, company=company)
return Response(html, mimetype='text/html')
fd, path = tempfile.mkstemp(suffix='.pdf')
os.close(fd)
try:
export_project_to_pdf(project, aufmass, positionen, path, company=company)
return send_file(path, as_attachment=True, download_name=_build_filename(project, '.pdf'))
except Exception as e:
flash(f'Fehler beim PDF-Export: {e}', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
@export_bp.route('/<int:project_id>/txt', methods=['GET','POST'])
@login_required
def txt(project_id):
project = Project.query.get_or_404(project_id)
if not current_user.hat_zugriff(project, 'lesen'):
flash('Zugriff verweigert.', 'danger')
return redirect(url_for('aufmass.index'))
from app.models.aufmass import Aufmass
aufmass_id = request.args.get('aufmass_id', type=int)
if aufmass_id:
aufmass = Aufmass.query.get_or_404(aufmass_id)
else:
aufmass = Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if not aufmass:
flash('Kein Aufmaß vorhanden.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
pos_query = Position.query.filter_by(aufmass_id=aufmass.id)
visible_ids = request.args.get('visible_ids')
if visible_ids:
ids = [int(x) for x in visible_ids.split(',') if x.isdigit()]
if ids:
pos_query = pos_query.filter(Position.id.in_(ids))
positionen = pos_query.order_by(Position.sortierung).all()
def _d(v):
if v is None:
return ''
if hasattr(v, 'strftime'):
return v.strftime('%d.%m.%Y')
s = str(v)[:10]
for fmt in ('%Y-%m-%d', '%d.%m.%Y'):
try:
from datetime import datetime
return datetime.strptime(s, fmt).strftime('%d.%m.%Y')
except ValueError:
continue
return s
typ = aufmass.typ if aufmass else ''
ist_teil = typ and 'Teilaufma' in typ
ap_vorname = project.ansprechpartner_vorname or ''
ap_nachname = project.ansprechpartner_nachname or ''
ap_name = f'{ap_vorname} {ap_nachname}'.strip()
lines = ['[Kopfdaten]']
lines.append(f'Teilaufma={"X" if ist_teil else ""}')
lines.append(f'Schlussaufma={"X" if not ist_teil else ""}')
lines.append(f'Datum={_d(project.datum)}')
lines.append(f'Baustelle={project.baustelle or ""}')
lines.append(f'AbrufNr={project.abruf_nr or ""}')
lines.append(f'SMNr={project.sm_nr or ""}')
lines.append(f'Vertrag={project.lv_name or ""}')
lines.append(f'StartZ={_d(project.datum_start)}')
lines.append(f'EndZ={_d(project.datum_ende)}')
lines.append(f'AspaN={ap_name}')
lines.append(f'AspaTel={project.ansprechpartner_tel or ""}')
lines.append(f'Bauabschnitt={project.bauabschnitt or ""}')
lines.append(f'Kolone=')
lines.append('[Aufmaßdaten]')
def g(v, is_money=False):
if v is None or v == 0 or v == '':
return ''
try:
n = float(str(v).replace(',','.'))
except (ValueError, TypeError):
return str(v)
if is_money:
return f'{n:.2f}'.replace('.',',')
s = f'{n:.2f}'.replace('.',',')
s = s.rstrip('0').rstrip(',')
return s
for p in positionen:
rows = [
p.abschnitt or '',
g(p.pos_nr) if p.pos_nr else '',
g(p.faktor),
g(p.laenge),
g(p.breite),
g(p.tiefe),
g(p.menge),
]
if p.einheit in ('ST', 'LE', 'STD', 'h', 'Psch'):
rows[6] = g(p.faktor * 1) if p.faktor else ''
rows += [
p.einheit or '',
p.kurztext or '',
p.bemerkung or '',
g(p.menge_hinten),
g(p.einzelpreis, is_money=True),
g(p.gesamtpreis, is_money=True),
]
lines.append('|' + '|'.join(rows))
buf = io.BytesIO()
buf.write('\n'.join(lines).encode('utf-8'))
buf.seek(0)
return send_file(buf, as_attachment=True, download_name=_build_filename(project, '.txt'), mimetype='text/plain; charset=utf-8')
@export_bp.route('/<int:project_id>/x31')
@login_required
def x31(project_id):
project, aufmass, positionen = _get_aufmass_and_positions(
project_id,
request.args.get('aufmass_id', type=int),
request.args.get('visible_ids')
)
if not project:
flash('Zugriff verweigert oder kein Aufmaß vorhanden.', 'danger')
return redirect(url_for('aufmass.index'))
xml_bytes = export_to_x31(project, aufmass, positionen)
if xml_bytes is None:
flash('Keine gültigen Positionen für X31-Export.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
buf = io.BytesIO(xml_bytes)
buf.seek(0)
return send_file(buf, as_attachment=True,
download_name=_build_filename(project, '.x31'),
mimetype='application/xml; charset=utf-8')
@export_bp.route('/<int:project_id>/x31_california')
@login_required
def x31_california(project_id):
project, aufmass, positionen = _get_aufmass_and_positions(
project_id,
request.args.get('aufmass_id', type=int),
request.args.get('visible_ids')
)
if not project:
flash('Zugriff verweigert oder kein Aufmaß vorhanden.', 'danger')
return redirect(url_for('aufmass.index'))
xml_bytes = export_to_x31(project, aufmass, positionen)
if xml_bytes is None:
flash('Keine gültigen Positionen für X31 California-Export.', 'danger')
return redirect(url_for('aufmass.aufmass_list', project_id=project_id))
ap_vorname = project.ansprechpartner_vorname or ''
ap_nachname = project.ansprechpartner_nachname or ''
owner_name = f'{ap_vorname} {ap_nachname}'.strip()
s_lv_name = (project.lv_name or project.baustelle or '')
cal_bytes = convert_to_california(
xml_bytes,
ref_prj_name=project.baustelle or '',
ref_prj_id=s_lv_name[:20],
owner_name=owner_name
)
buf = io.BytesIO(cal_bytes)
buf.seek(0)
return send_file(buf, as_attachment=True,
download_name=_build_filename(project, '.x31ca'),
mimetype='application/xml; charset=utf-8')
@export_bp.route('/<int:project_id>/zip_download', methods=['POST'])
@login_required
def zip_download(project_id):
project = Project.query.get_or_404(project_id)
if not current_user.hat_zugriff(project, 'lesen'):
return jsonify({'error': 'Zugriff verweigert'}), 403
try:
exports = json.loads(request.form.get('exports', '[]'))
base_params = request.form.get('base_params', '')
zip_name = request.form.get('zip_name', 'export.zip')
except:
return jsonify({'error': 'Ungültige Anfrage'}), 400
if not exports:
return jsonify({'error': 'Keine Exporte ausgewählt'}), 400
from app.models.aufmass import Aufmass
aufmass_id = request.args.get('aufmass_id') or (base_params.split('aufmass_id=')[1].split('&')[0] if 'aufmass_id=' in base_params else None)
visible_ids = base_params.split('visible_ids=')[1].split('&')[0] if 'visible_ids=' in base_params else None
aufmass = Aufmass.query.get(aufmass_id) if aufmass_id else Aufmass.query.filter_by(project_id=project_id).order_by(Aufmass.sortierung).first()
if not aufmass:
return jsonify({'error': 'Kein Aufmaß gefunden'}), 400
pos_query = Position.query.filter_by(aufmass_id=aufmass.id)
if visible_ids:
ids = [int(x) for x in visible_ids.split(',') if x.isdigit()]
if ids:
pos_query = pos_query.filter(Position.id.in_(ids))
positionen = pos_query.order_by(Position.sortierung).all()
company = current_user.company if not current_user.is_superadmin() else None
temp_files = []
fd, zip_path = tempfile.mkstemp(suffix='.zip')
os.close(fd)
temp_files.append(zip_path)
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for exp in exports:
exp_id = exp['id']
suffix = exp['suffix']
if exp_id == 'excel':
fd, path = tempfile.mkstemp(suffix='.xlsx')
os.close(fd)
temp_files.append(path)
export_project_to_excel(project, aufmass, positionen, path, company=company)
zf.write(path, f'Aufmass - Excel{suffix}')
elif exp_id == 'pdf':
fd, path = tempfile.mkstemp(suffix='.pdf')
os.close(fd)
temp_files.append(path)
export_project_to_pdf(project, aufmass, positionen, path, company=company)
zf.write(path, f'Aufmass - PDF{suffix}')
elif exp_id == 'txt':
buf = io.BytesIO()
buf.write(_generate_txt_content(project, aufmass, positionen).encode('utf-8'))
buf.seek(0)
zf.writestr(f'Aufmass - TXT{suffix}', buf.getvalue())
elif exp_id == 'x31':
xml_bytes = export_to_x31(project, aufmass, positionen)
if xml_bytes:
zf.writestr(f'Aufmass - X31{suffix}', xml_bytes)
elif exp_id == 'x31ca':
xml_bytes = export_to_x31(project, aufmass, positionen)
if xml_bytes:
ap_vorname = project.ansprechpartner_vorname or ''
ap_nachname = project.ansprechpartner_nachname or ''
owner_name = f'{ap_vorname} {ap_nachname}'.strip()
s_lv_name = (project.lv_name or project.baustelle or '')
cal_bytes = convert_to_california(
xml_bytes,
ref_prj_name=project.baustelle or '',
ref_prj_id=s_lv_name[:20],
owner_name=owner_name
)
zf.writestr(f'Aufmass - X31 California{suffix}', cal_bytes)
return send_file(zip_path, as_attachment=True, download_name=zip_name, mimetype='application/zip')
finally:
for path in temp_files:
try:
os.unlink(path)
except:
pass
def _generate_txt_content(project, aufmass, positionen):
def _d(v):
if v is None:
return ''
if hasattr(v, 'strftime'):
return v.strftime('%d.%m.%Y')
s = str(v)[:10]
for fmt in ('%Y-%m-%d', '%d.%m.%Y'):
try:
from datetime import datetime
return datetime.strptime(s, fmt).strftime('%d.%m.%Y')
except ValueError:
continue
return s
typ = aufmass.typ if aufmass else ''
ist_teil = typ and 'Teilaufma' in typ
ap_vorname = project.ansprechpartner_vorname or ''
ap_nachname = project.ansprechpartner_nachname or ''
ap_name = f'{ap_vorname} {ap_nachname}'.strip()
lines = ['[Kopfdaten]']
lines.append(f'Teilaufma={"X" if ist_teil else ""}')
lines.append(f'Schlussaufma={"X" if not ist_teil else ""}')
lines.append(f'Datum={_d(project.datum)}')
lines.append(f'Baustelle={project.baustelle or ""}')
lines.append(f'AbrufNr={project.abruf_nr or ""}')
lines.append(f'SMNr={project.sm_nr or ""}')
lines.append(f'Vertrag={project.lv_name or ""}')
lines.append(f'StartZ={_d(project.datum_start)}')
lines.append(f'EndZ={_d(project.datum_ende)}')
lines.append(f'AspaN={ap_name}')
lines.append(f'AspaTel={project.ansprechpartner_tel or ""}')
lines.append(f'Bauabschnitt={project.bauabschnitt or ""}')
lines.append(f'Kolone=')
lines.append('[Aufmaßdaten]')
def g(v, is_money=False):
if v is None or v == 0 or v == '':
return ''
try:
n = float(str(v).replace(',','.'))
except (ValueError, TypeError):
return str(v)
if is_money:
return f'{n:.2f}'.replace('.',',')
s = f'{n:.2f}'.replace('.',',')
s = s.rstrip('0').rstrip(',')
return s
for p in positionen:
rows = [
p.abschnitt or '',
g(p.pos_nr) if p.pos_nr else '',
g(p.faktor),
g(p.laenge),
g(p.breite),
g(p.tiefe),
g(p.menge),
]
if p.einheit in ('ST', 'LE', 'STD', 'h', 'Psch'):
rows[6] = g(p.faktor * 1) if p.faktor else ''
rows += [
p.einheit or '',
p.kurztext or '',
p.bemerkung or '',
g(p.menge_hinten),
g(p.einzelpreis, is_money=True),
g(p.gesamtpreis, is_money=True),
]
lines.append('|' + '|'.join(rows))
return '\n'.join(lines)