Files
aufmass-web/_aufmass_web/app/services/export_x31_service.py
T

486 lines
16 KiB
Python

import re, uuid, io
from datetime import datetime
_g_id_counter = 1000001
def _next_id():
global _g_id_counter
rv = _g_id_counter
_g_id_counter += 1
return rv
def _xe(s):
s = s.replace('&', '&')
s = s.replace('<', '&lt;')
s = s.replace('>', '&gt;')
s = s.replace('"', '&quot;')
s = s.replace("'", '&apos;')
return s
def _to_float(s):
if not s:
return 0.0
s = s.strip()
while s and s[-1] in (',', '.'):
s = s[:-1]
s = s.replace(',', '.')
try:
return float(s)
except ValueError:
return 0.0
def _fmt_qty(fval):
fabs = abs(fval)
ganz = int(fabs)
dez = fabs - ganz
d3 = int(dez * 1000 + 0.5)
if d3 >= 1000:
ganz += 1
d3 = 0
vorz = '-' if fval < 0 else ''
if d3 == 0:
return f'{vorz}{ganz}'
sdez = str(d3).zfill(3).rstrip('0')
return f'{vorz}{ganz},{sdez}'
def _oz_info(oz):
result = {'typ': '?', 'e1': '', 'e2': '', 'e3': '', 'pos': oz,
'original': oz, 'len_e1': 0, 'len_e2': 0, 'len_e3': 0, 'len_pos': 0}
if not oz:
return result
if re.match(r'^\d{6,10}$', oz):
result['typ'] = 'C'
result['pos'] = oz
result['len_pos'] = len(oz)
return result
parts = oz.split('.')
n = len(parts)
if n == 3:
result['typ'] = 'A'
result['e1'] = parts[0]
result['e2'] = parts[1]
result['pos'] = parts[2]
result['len_e1'] = len(parts[0])
result['len_e2'] = len(parts[1])
result['len_pos'] = len(parts[2])
elif n == 4:
result['typ'] = 'B'
result['e1'] = parts[0]
result['e2'] = parts[1]
result['e3'] = parts[2]
result['pos'] = parts[3]
result['len_e1'] = len(parts[0])
result['len_e2'] = len(parts[1])
result['len_e3'] = len(parts[2])
result['len_pos'] = len(parts[3])
return result
def _is_valid_oz(oz):
if not oz:
return False
if re.match(r'^\d{1,4}\.\d{1,4}\.\d{1,6}$', oz):
return True
if re.match(r'^\d{1,4}\.\d{1,4}\.\d{1,4}\.\d{1,6}$', oz):
return True
if re.match(r'^\d{6,10}$', oz):
return True
return False
def _reboz_code(idx):
blatt = 1000 + idx // 26
zeile = chr(65 + idx % 26)
return f'{blatt}{zeile}0'
def _k_zeile(ort, oz_code):
ort_pad = (ort + ' ' * 56)[:56]
return f' *{ort_pad}{oz_code} '
def _l_zeile(menge, oz_code):
menge_k = menge.strip().replace('.', ',')
formel = f'100091{menge_k}='
formel = formel[:44].ljust(44)
return f' {formel}{oz_code} '
def _make_qdeterm_pair(ort, qty, oz_cnt):
oz1 = _reboz_code(oz_cnt)
oz_cnt += 1
oz2 = _reboz_code(oz_cnt)
oz_cnt += 1
out = ''
out += '<QDetermItem>'
out += f'<QTakeoff Row="{_xe(_k_zeile(ort, oz1))}"/>'
out += f'<BVBS:Explanation>{_xe(ort.strip())}</BVBS:Explanation>'
out += '</QDetermItem>'
out += '<QDetermItem>'
sqty = _fmt_qty(qty)
out += f'<QTakeoff Row="{_xe(_l_zeile(sqty, oz2))}"/>'
out += '</QDetermItem>'
return out, oz_cnt
def _gen_uuid():
return str(uuid.uuid4()).lower()
def _datum_iso(d):
if d is None:
return ''
if isinstance(d, str):
d = d[:10]
for fmt in ('%Y-%m-%d', '%d.%m.%Y'):
try:
return datetime.strptime(d, fmt).strftime('%Y-%m-%d')
except ValueError:
continue
return d
return d.strftime('%Y-%m-%d')
def export_to_x31(project, aufmass, positionen):
global _g_id_counter
_g_id_counter = 1000001
s_datum = _datum_iso(project.datum) if project.datum else datetime.now().strftime('%Y-%m-%d')
s_baustelle = project.baustelle or ''
s_bauabs = project.bauabschnitt or ''
s_vertrag = project.lv_name or ''
ap_vorname = project.ansprechpartner_vorname or ''
ap_nachname = project.ansprechpartner_nachname or ''
s_askan = f'{ap_vorname} {ap_nachname}'.strip()
s_askatel = project.ansprechpartner_tel or ''
s_iso_datum = _datum_iso(project.datum) if project.datum else datetime.now().strftime('%Y-%m-%d')
s_uid = _gen_uuid()
s_boq_uid = _gen_uuid()
s_lv_name = s_vertrag or f'{s_baustelle} {s_bauabs}'.strip()
s_lv_name_20 = s_lv_name[:20]
s_baust50 = s_baustelle[:50]
pos_data = []
for p in positionen:
if not p.pos_nr or not _is_valid_oz(p.pos_nr.strip()):
continue
qty = p.menge_hinten or 0.0
pos_data.append({
'oz': p.pos_nr.strip(),
'qty': qty,
'ort': p.abschnitt or '',
'beschr': p.kurztext or '',
'bemerk': p.bemerkung or '',
'einh': p.einheit or '',
'ep': p.einzelpreis or 0.0,
})
if not pos_data:
return None
oz_typ = 'A'
max_pos_len = 4
for pd in pos_data:
info = _oz_info(pd['oz'])
if info['typ'] == 'B':
oz_typ = 'B'
if info['typ'] == 'C' and oz_typ == 'A':
oz_typ = 'C'
if info['len_pos'] > max_pos_len:
max_pos_len = info['len_pos']
now = datetime.now()
s_time = now.strftime('%H:%M:%S')
x = '<?xml version="1.0" encoding="UTF-8"?>\n'
x += '<!-- REB 23.003 (2009) - X31 Export AutoIt v8 (Dataflor-kompatibel) -->\n'
x += '<GAEB xmlns="http://www.gaeb.de/GAEB_DA_XML/DA31/3.3" xmlns:BVBS="BVBS">'
x += '<GAEBInfo>'
x += '<Version>3.3</Version>'
x += '<VersDate>2023-01</VersDate>'
x += f'<Date>{s_iso_datum}</Date>'
x += f'<Time>{s_time}</Time>'
x += '<ProgSystem>AutoIt REB Engine V1.2</ProgSystem>'
x += '<ProgName>AutoIt REB X31 Export</ProgName>'
x += '</GAEBInfo>'
x += '<QtyDeterm>'
x += '<PrjInfo>'
x += f'<RefPrjName>{_xe(s_baust50)}</RefPrjName>'
x += f'<RefPrjID>{_xe(s_lv_name_20)}</RefPrjID>'
x += '</PrjInfo>'
x += f'<QtyDetermInfo ID="{s_uid}">'
x += '<MethodDescription>REB23003-2009</MethodDescription>'
x += '</QtyDetermInfo>'
x += '<DP>31</DP>'
x += '<OWN><Address>'
x += f'<Name1>{_xe(s_askan)}</Name1>'
x += '<Name2></Name2><Name3/><Name4/>'
x += '<Street></Street><PCode></PCode><City></City>'
x += f'<Contact/><Phone>{_xe(s_askatel)}</Phone><Fax/><Email/>'
x += '</Address></OWN>'
x += '<CTR><Address><Name1/><Name2></Name2><Name3/><Name4/>'
x += '<Street></Street><PCode></PCode><City></City>'
x += '<Contact/><Phone/><Fax/><Email/></Address></CTR>'
x += f'<BoQ ID="DF_{_next_id()}">'
x += f'<RefBoQName>{_xe(s_lv_name_20)}</RefBoQName>'
x += f'<RefBoQID>{s_boq_uid}</RefBoQID>'
if oz_typ == 'A':
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Titel</LblBoQBkdn><Length>2</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Bauteil</LblBoQBkdn><Length>2</Length><Num>No</Num></BoQBkdn>'
x += f'<BoQBkdn><Type>Item</Type><LblBoQBkdn>Position</LblBoQBkdn><Length>{max_pos_len}</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>Index</Type><LblBoQBkdn>Index</LblBoQBkdn><Length>1</Length><Num>No</Num></BoQBkdn>'
elif oz_typ == 'B':
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Titel</LblBoQBkdn><Length>1</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Bauteil</LblBoQBkdn><Length>1</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Abschnitt</LblBoQBkdn><Length>2</Length><Num>No</Num></BoQBkdn>'
x += f'<BoQBkdn><Type>Item</Type><LblBoQBkdn>Position</LblBoQBkdn><Length>{max_pos_len}</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>Index</Type><LblBoQBkdn>Index</LblBoQBkdn><Length>1</Length><Num>No</Num></BoQBkdn>'
else:
x += '<BoQBkdn><Type>BoQLevel</Type><LblBoQBkdn>Titel</LblBoQBkdn><Length>2</Length><Num>No</Num></BoQBkdn>'
x += f'<BoQBkdn><Type>Item</Type><LblBoQBkdn>Position</LblBoQBkdn><Length>{max_pos_len}</Length><Num>No</Num></BoQBkdn>'
x += '<BoQBkdn><Type>Index</Type><LblBoQBkdn>Index</LblBoQBkdn><Length>1</Length><Num>No</Num></BoQBkdn>'
x += '<Ctlg><CtlgID>idDIN276_1993</CtlgID>'
x += '<CtlgType>cost group DIN 276-93</CtlgType>'
x += '<CtlgName>DIN 276-93</CtlgName></Ctlg>'
x += '<BoQBody>'
oz_cnt = 0
oz_order = []
seen_keys = set()
for pd in pos_data:
info = _oz_info(pd['oz'])
key = (info['e1'], info['e2'], info['e3'], info['pos'])
if key not in seen_keys:
seen_keys.add(key)
oz_order.append(key)
cur_e1 = ''
cur_e2 = ''
cur_e3 = ''
b_il = False
for oi, (s_e1, s_e2, s_e3, s_pos) in enumerate(oz_order):
same_e1 = (s_e1 == cur_e1)
same_e2 = (s_e2 == cur_e2) and same_e1
same_e3 = (s_e3 == cur_e3) and same_e2
if oz_typ == 'B' and not same_e3 and b_il:
x += '</Itemlist></BoQBody>'
x += '</BoQCtgy>'
b_il = False
cur_e3 = ''
if not same_e2 and b_il:
x += '</Itemlist></BoQBody>'
x += '</BoQCtgy>'
b_il = False
cur_e3 = ''
if oz_typ == 'B' and not same_e2 and cur_e2 != '':
x += '</BoQBody>'
x += '</BoQCtgy>'
cur_e2 = ''
cur_e3 = ''
if not same_e1 and cur_e1 != '':
x += '</BoQBody>'
x += '</BoQCtgy>'
cur_e1 = ''
if s_e1 != cur_e1:
x += f'<BoQCtgy RNoPart="{_xe(s_e1)}" ID="DF_{_next_id()}">'
x += '<BoQBody>'
cur_e1 = s_e1
if oz_typ == 'B' and s_e2 != cur_e2:
x += f'<BoQCtgy RNoPart="{_xe(s_e2)}" ID="DF_{_next_id()}">'
x += '<BoQBody>'
cur_e2 = s_e2
cur_e3 = ''
if oz_typ != 'B':
cur_e2 = s_e2
if oz_typ == 'B':
cur_e3 = s_e3
if not b_il:
if oz_typ == 'B':
x += f'<BoQCtgy RNoPart="{_xe(cur_e3)}" ID="DF_{_next_id()}">'
else:
x += f'<BoQCtgy RNoPart="{_xe(cur_e2)}" ID="DF_{_next_id()}">'
x += '<BoQBody><Itemlist>'
b_il = True
f_qty_sum = 0.0
for pd in pos_data:
ick = _oz_info(pd['oz'])
if ick['e1'] != s_e1 or ick['e2'] != s_e2:
continue
if oz_typ == 'B' and ick['e3'] != s_e3:
continue
if ick['pos'] != s_pos:
continue
f_qty_sum += pd['qty']
x += f'<Item ID="DF_{_next_id()}" RNoPart="{_xe(s_pos)}">'
x += '<QtyDeterm>'
x += f'<Qty>{_fmt_qty(f_qty_sum).replace(",", ".")}</Qty>'
for pd in pos_data:
ian = _oz_info(pd['oz'])
if ian['e1'] != s_e1 or ian['e2'] != s_e2:
continue
if oz_typ == 'B' and ian['e3'] != s_e3:
continue
if ian['pos'] != s_pos:
continue
pair, oz_cnt = _make_qdeterm_pair(pd['ort'], pd['qty'], oz_cnt)
x += pair
x += '</QtyDeterm>'
x += '</Item>'
if b_il:
x += '</Itemlist></BoQBody>'
x += '</BoQCtgy>'
if oz_typ == 'B' and cur_e2 != '':
x += '</BoQBody>'
x += '</BoQCtgy>'
if cur_e1 != '':
x += '</BoQBody>'
x += '</BoQCtgy>'
x += '</BoQBody>'
x += '</BoQ>'
x += '</QtyDeterm>'
x += '</GAEB>'
bom = b'\xef\xbb\xbf'
return bom + x.encode('utf-8')
def convert_to_california(xml_bytes, ref_prj_name='', ref_prj_id='', owner_name=''):
content = xml_bytes.decode('utf-8')
if content.startswith('\ufeff'):
content = content[1:]
content = content.replace('\r\n', '\n')
content = content.replace('\r', '\n')
content = content.replace('\n', '\r\n')
m = re.search(r'(<Street>)([\s\S]*?)(</Street>)', content)
if m:
street_alt = m.group(0)
street_neu = m.group(1) + m.group(2).replace('\r\n', '&#10;') + m.group(3)
content = content.replace(street_alt, street_neu)
content = re.sub(r'<!--[\s\S]*?-->', '<!-- REB 23.003 (2009) - X31 Export AutoIt v5 -->', content)
heute = datetime.now().strftime('%Y-%m-%d')
zeit = datetime.now().strftime('%H:%M:%S')
content = re.sub(r'(<Date>)[^<]*(</Date>)', rf'\g<1>{heute}\g<2>', content)
content = re.sub(r'(<Time>)[^<]*(</Time>)', rf'\g<1>{zeit}\g<2>', content)
content = re.sub(r'(<ProgSystem>)[^<]*(</ProgSystem>)', r'\g<1>Python REB Engine V1.2\g<2>', content)
content = re.sub(r'(<ProgName>)[^<]*(</ProgName>)', r'\g<1>Python REB X31 Export\g<2>', content)
if ref_prj_name:
content = re.sub(r'(<RefPrjName>)[^<]*(</RefPrjName>)', rf'\g<1>{_xe(ref_prj_name)}\g<2>', content)
if ref_prj_id:
content = re.sub(r'(<RefPrjID>)[^<]*(</RefPrjID>)', rf'\g<1>{_xe(ref_prj_id)}\g<2>', content)
neue_guid = str(uuid.uuid4()).lower()
content = re.sub(
r'(QtyDetermInfo\s+ID=")[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}(")',
rf'\g<1>{neue_guid}\g<2>',
content
)
if owner_name:
m_own = re.search(r'(<OWN>[\s\S]*?</OWN>)', content)
if m_own:
own_alt = m_own.group(1)
own_neu = re.sub(r'(<Name1>)[^<]*(</Name1>)', rf'\g<1>{_xe(owner_name)}\g<2>', own_alt)
content = content.replace(own_alt, own_neu)
content = re.sub(r'\s*<CtlgAssignType\s*/>', '', content)
content = re.sub(r'\s*<CtlgAssignType>[^<]*</CtlgAssignType>', '', content)
content = re.sub(r'<BVBS:Explanation>[^<]*</BVBS:Explanation>', '', content)
imax = 30
while imax > 0:
vorher = content
content = re.sub(
r'<Item\s[^>]+>\s*<QtyDeterm>\s*<Qty>0[,.]000</Qty>\s*</QtyDeterm>\s*</Item>',
'',
content
)
if content == vorher:
break
imax -= 1
imax = 10
while imax > 0:
vorher = content
content = re.sub(r'<Itemlist>\s*</Itemlist>', '', content)
content = re.sub(r'<BoQBody>\s*</BoQBody>', '', content)
content = re.sub(r'<BoQCtgy[^>]*>\s*</BoQCtgy>', '', content)
if content == vorher:
break
imax -= 1
content = _renumber_ids(content)
content = _renumber_zeilen_ids(content)
content = _format_xml(content)
bom = b'\xef\xbb\xbf'
return bom + content.encode('utf-8')
def _renumber_ids(xml_str):
all_ids = re.findall(r'ID="(DF_\d+)"', xml_str)
if not all_ids:
return xml_str
unique = sorted(set(all_ids))
next_id = 1000001
for old_id in unique:
new_id = f'DF_{next_id}'
next_id += 1
xml_str = xml_str.replace(f'ID="{old_id}"', f'ID="{new_id}"')
return xml_str
def _renumber_zeilen_ids(xml_str):
blatt = 1
pos = 0
while True:
m = re.search(r'QTakeoff Row="', xml_str[pos:])
if not m:
break
i_start = pos + m.start()
i_row_start = i_start + 14
i_row_end = xml_str.index('"', i_row_start)
row_len = i_row_end - i_row_start
if row_len == 80:
neue_id = f'{blatt:04d}A0'
row_old = xml_str[i_row_start:i_row_end]
row_new = row_old[:69] + neue_id + row_old[75:]
xml_str = xml_str[:i_row_start] + row_new + xml_str[i_row_end:]
blatt += 1
pos = i_row_start + 1
return xml_str
def _format_xml(xml_str):
xml_str = re.sub(r'>\s+<', '>\r\n<', xml_str)
xml_str = xml_str.replace('<BoQBody>', '<BoQBody>\r\n')
xml_str = xml_str.replace('<Itemlist>', '<Itemlist>\r\n')
xml_str = re.sub(r'(<BoQCtgy[^>]*>)', r'\g<1>\r\n', xml_str)
xml_str = re.sub(r'(<Item [^>]*>)', r'\g<1>\r\n', xml_str)
close_tags = ['</BoQBody>', '</Itemlist>', '</BoQCtgy>', '</Item>', '</QtyDeterm>']
for tag in close_tags:
xml_str = xml_str.replace(tag, '\r\n' + tag + '\r\n')
xml_str = xml_str.replace('<QtyDeterm>', '\r\n<QtyDeterm>\r\n')
while '\r\n\r\n\r\n' in xml_str:
xml_str = xml_str.replace('\r\n\r\n\r\n', '\r\n\r\n')
while xml_str.startswith('\r\n'):
xml_str = xml_str[2:]
return xml_str