Files

333 lines
16 KiB
Python

import re
import time
import logging
from urllib.parse import quote, urlencode
import requests
logger = logging.getLogger(__name__)
EV_HOST = "https://evergabe.telekom.de"
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:146.0) Gecko/20100101 Firefox/146.0"
class EVergabeError(Exception):
pass
class EVergabeClient:
"""Python-Port der AutoIt EVvergabeWebobj.au3 Funktionen."""
def __init__(self, username, password, name=""):
self.username = username
self.password = password
self.name = name
self.session = requests.Session()
self.session.headers.update({
"User-Agent": USER_AGENT,
})
self.session.verify = True
def _url_encode(self, s):
return quote(str(s), safe="")
def _extract_csrf(self, html):
meta = re.search(
r'(?i)<meta[^>]+name="csrf-token"[^>]+content="([^"]+)"', html
)
if meta:
return "_csrf", meta.group(1)
inp = re.search(
r'(?i)name="([^"]*csrf[^"]*)"[^>]*value="([^"]+)"', html
)
if inp:
return inp.group(1), inp.group(2)
raise EVergabeError("CSRF-Token nicht gefunden")
def login(self):
html = self._get(f"{EV_HOST}/public/login")
csrf_field, csrf_value = self._extract_csrf(html)
body = (
f"LoginForm[username]={self._url_encode(self.username)}"
f"&LoginForm[password]={self._url_encode(self.password)}"
f"&{csrf_field}={self._url_encode(csrf_value)}"
)
resp = self.session.post(
f"{EV_HOST}/public/login",
data=body,
headers={"Content-Type": "application/x-www-form-urlencoded"},
allow_redirects=False,
)
if resp.status_code not in (301, 302, 303) and "Logout" not in resp.text:
raise EVergabeError(f"Login fehlgeschlagen, HTTP {resp.status_code}")
if resp.status_code in (301, 302, 303):
loc = resp.headers.get('Location', '/')
if loc.startswith('http'):
follow = self.session.get(loc, allow_redirects=True)
else:
follow = self.session.get(f"{EV_HOST}{loc}", allow_redirects=True)
logger.info("EVergabe Login erfolgreich")
return True
def _get(self, url, referer=""):
headers = {}
if referer:
headers["Referer"] = referer
resp = self.session.get(url, headers=headers, timeout=30)
text = resp.text.replace("amp;", "")
with open(r'E:\_Coding\__Deepseek_coding\AufmassCreater v2.35 (20260309)\_aufmass_web\ev_debug.log', 'a', encoding='utf-8') as f:
f.write(f"\n{'='*80}\nGET {url}\nReferer: {referer}\nStatus: {resp.status_code}\n{'='*80}\n{text}\n\n")
return text
def _post(self, url, body, referer=""):
headers = {"Content-Type": "application/x-www-form-urlencoded"}
if referer:
headers["Referer"] = referer
resp = self.session.post(url, data=body, headers=headers, timeout=30)
return resp.text.replace("amp;", "")
# === _EV_Search_SM_obj ===
def search_sm(self, sm_nr):
url = (
f"{EV_HOST}/framework-agreement-call"
f"?OrderRecallSearch%5Bhead_line%5D={self._url_encode(sm_nr)}"
f"&OrderRecallSearch%5Bdocument_no%5D="
f"&OrderRecallSearch%5Bincoming_date%5D="
f"&OrderRecallSearch%5Border_date%5D="
f"&OrderRecallSearch%5Bgeneral_agreement_id%5D="
f"&OrderRecallSearch%5Bdocument_state%5D=-1"
)
referer = f"{EV_HOST}/framework-agreement-call"
html = self._get(url, referer)
if sm_nr not in html:
return []
results = []
titles = re.findall(r'(?m)no-wrap" title="([^"]+)', html)
bedarfnr = re.findall(r'Bedarfsnr.: 0([^<]+)<', html)
belegnr = re.findall(r'(42[^\/]+)\/0001', html)
detail_ids = re.findall(r'call/details\?id=([^&]+)&c=1', html)
rv = re.findall(r'/framework-agreement/details\?id=[^"]+">([^<]+)', html)
dates_all = re.findall(r'(\d{2}\.\d{2}\.\d{4})', html)
ausfuehrungsfrist = re.findall(r'(\d{2}\.\d{2}\.\d{4})\s*</td>\s*<td>\s*<a href', html)
status = re.findall(r'/framework-agreement/details\?id=[^"]+">[^<]+', html)
beleg_eingang = dates_all if dates_all else []
max_len = max(len(titles), len(bedarfnr), len(belegnr), len(detail_ids))
logger.info(f"search_sm: titles={len(titles)}, bedarfnr={len(bedarfnr)}, belegnr={len(belegnr)}, detail_ids={len(detail_ids)}")
logger.info(f"search_sm: detail_ids={detail_ids}")
logger.info(f"search_sm: titles_sample={titles[0][:100] if titles else 'none'}")
for i in range(max_len):
results.append({
"title": (titles[i].replace("SM Auftragsnummer: 000", " SM ").replace("\r", " ").replace("\n", " ") if i < len(titles) else ""),
"bedarf_nr": bedarfnr[i] if i < len(bedarfnr) else "",
"beleg_nr": belegnr[i] if i < len(belegnr) else "",
"detail_id": detail_ids[i] if i < len(detail_ids) else "",
"rv": rv[i] if i < len(rv) else "",
"beleg_eingang": beleg_eingang[i] if i < len(beleg_eingang) else "",
"ausfuehrungsfrist": ausfuehrungsfrist[i] if i < len(ausfuehrungsfrist) else "",
"status": status[i] if i < len(status) else "",
})
return results
# === _EV_Aspa_obj ===
def get_aspa(self, details_id):
url = f"{EV_HOST}/framework-agreement-call/details?id={details_id}&c=1"
referer = f"{EV_HOST}/framework-agreement-call/"
html = self._get(url, referer)
name = re.search(r'Name\s*</th>\s*<td>\s*([^<]+?)\s*</td>', html)
emails = re.findall(r'mailto:([^"?\s&]+)', html)
tel = re.search(r'th>Telefon</th><td>([^<]+)', html)
name_val = name.group(1).strip() if name else ""
email_val = emails[1].strip() if len(emails) > 1 else (emails[0].strip() if emails else "")
tel_val = tel.group(1).strip() if tel else ""
return {
"name": name_val,
"email": email_val,
"tel": tel_val,
}
# === _EV_Hole_Kopfdaten_obj (kombiniert) ===
def hole_kopfdaten(self, sm_nr):
self.login()
results = self.search_sm(sm_nr)
if not results:
raise EVergabeError(f"Kein Treffer für SM-Nr: {sm_nr}")
first = results[0]
logger.info(f"SM-Suche: title={first['title'][:50]}, detail_id={first['detail_id']}, beleg_eingang={first['beleg_eingang']}, ausfuehrungsfrist={first['ausfuehrungsfrist']}")
aspa = self.get_aspa(first["detail_id"])
return {
"bezeichnung": first["title"].strip(),
"sm_nr": first["bedarf_nr"],
"abruf_nr": first["beleg_nr"],
"detail_id": first["detail_id"],
"rv": first["rv"],
"beleg_eingang": first["beleg_eingang"],
"ausfuehrungsfrist": first["ausfuehrungsfrist"],
"status": first["status"],
"ansprechpartner_name": aspa["name"],
"ansprechpartner_email": aspa["email"],
"ansprechpartner_tel": aspa["tel"],
}
# === _EV_Pos_eintragen_obj (Aufmaß übertragen) ===
def aufmass_uebertragen(self, sm_nr, positionen, bauabschnitt="", leist_zeitv="", leist_zeitb="", leistungsort="", teilaufmass=False, schlussaufmass=False):
self.login()
results = self.search_sm(sm_nr)
if not results:
raise EVergabeError(f"Kein Treffer für SM-Nr: {sm_nr}")
if len(results) > 1:
details_id = results[0]["detail_id"]
else:
details_id = results[0]["detail_id"]
logger.info(f"DetailsID: {details_id}")
html = self._get(
f"{EV_HOST}/framework-agreement-call/details?id={details_id}&c=1",
f"{EV_HOST}/framework-agreement-call"
)
if "LERF nicht möglich" in html:
raise EVergabeError("LERF nicht möglich")
html = self._get(
f"{EV_HOST}/sheet/index?c=1&importId={details_id}",
f"{EV_HOST}/framework-agreement-call/details?id={details_id}&c=1"
)
html = self._get(
f"{EV_HOST}/sheet/create-sheet?c=1&id={details_id}",
f"{EV_HOST}/sheet/index?c=1&importId={details_id}"
)
csrf_field, csrf_value = self._extract_csrf(html)
leistungsort1 = leistungsort[:25] if len(leistungsort) >= 25 else leistungsort
leistungsort_so = leistungsort
sachbearbeiter = self.name[:12] if len(self.name) >= 12 else self.name
bauabschnitt_clean = bauabschnitt[:39] if len(bauabschnitt) >= 40 else bauabschnitt
kurztext = f"SM {sm_nr}"[:39]
langtext = f"{leistungsort} {bauabschnitt} SM {sm_nr}"
final = "X" if schlussaufmass else ""
body = (
f"_csrf={self._url_encode(csrf_value)}%3D%3D"
f"&BapiEssr%5Bfinal%5D={final}"
f"&BapiEssr%5Bfinal%5D={final}"
f"&BapiEssr%5Blzvon%5D={self._url_encode(leist_zeitv)}"
f"&BapiEssr%5Blzbis%5D={self._url_encode(leist_zeitb)}"
f"&BapiEssr%5Bdlort%5D={self._url_encode(leistungsort1)}"
f"&BapiEssr%5Bsbnaman%5D={self._url_encode(sachbearbeiter)}"
f"&BapiEssr%5Btxz01%5D={self._url_encode(bauabschnitt_clean)}"
f"&BapiEssr%5Bdescription%5D={self._url_encode(langtext)}"
f"&save+sheets="
)
html = self._post(
f"{EV_HOST}/sheet/create-sheet?c=1&iid={details_id}",
body,
f"{EV_HOST}/sheet/create-sheet?c=1&iid={details_id}"
)
sheet_id_match = re.search(r'sheetId=(.*)">Kopfdaten', html)
if not sheet_id_match:
raise EVergabeError(f"sheetID nicht gefunden. Response: {html[:200]}")
sheet_id = sheet_id_match.group(1)
logger.info(f"SheetID: {sheet_id}")
ergebnisse = []
for idx, pos in enumerate(positionen):
pos_html = self._get(
f"{EV_HOST}/sheet-position/index?c=1&sheetId={sheet_id}",
f"{EV_HOST}/sheet/index?c=1&importId={details_id}"
)
csrf_field2, csrf_value2 = self._extract_csrf(pos_html)
pos_nr = pos.get("pos_nr", "")
pos_body = f"_csrf={self._url_encode(csrf_value2)}%3D%3D&ServicePosition%5Bnumber%5D={self._url_encode(pos_nr)}&insertPosition=0"
pos_html2 = self._post(
f"{EV_HOST}/sheet-position/index?c=1&sheetId={sheet_id}",
pos_body,
f"{EV_HOST}/sheet-position/index?c=1&sheetId={sheet_id}"
)
pos_id_match = re.search(r'positionId=([^&]+)', pos_html2)
if not pos_id_match:
ergebnisse.append({"pos_nr": pos_nr, "status": "error", "msg": "positionId nicht gefunden"})
continue
pos_id = pos_id_match.group(1)
create_url = f"{EV_HOST}/sheet-position/create?insertPosition=0&positionId={pos_id}&c=1&sheetId={sheet_id}"
create_html = self._get(
f"{EV_HOST}/sheet-position/create?insertPosition=0&positionId={pos_id}&c=1&sheetId={sheet_id}",
f"{EV_HOST}/sheet/index?c=1&importId={details_id}"
)
csrf_field3, csrf_value3 = self._extract_csrf(create_html)
einheit = pos.get("einheit", "ST")
abschnitt = pos.get("bauabschnitt", bauabschnitt_clean)[:25]
langtext_pos = pos.get("langtext", "")
pos_kurztext = f"{pos_nr}|{pos.get('bezeichnung', '')}"
if einheit == "ST":
menge = str(pos.get("menge", "")).replace(",", "%2C")
post_body = (
f"_csrf={self._url_encode(csrf_value3)}%3D%3D"
f"&ServicePosition%5B0%5D%5BsectionText%5D={self._url_encode(abschnitt)}"
f"&ServicePosition%5B0%5D%5Bquantity%5D={menge}"
f"&ServicePosition%5B0%5D%5BlongText%5D={self._url_encode(langtext_pos)}"
f"&ServicePosition%5B0%5D%5Bid%5D={pos_id}"
f"&clientId=1&sheetId={sheet_id}&insertPosition={idx}&save="
)
elif einheit == "M":
faktor = str(pos.get("faktor", "1")).replace(",", "%2C")
meter = str(pos.get("menge", "")).replace(",", "%2C")
post_body = (
f"_csrf={self._url_encode(csrf_value3)}%3D%3D"
f"&ServicePosition%5B0%5D%5BsectionText%5D={self._url_encode(abschnitt)}"
f"&ServicePosition%5B0%5D%5BformulaSymbol%5D=ME"
f"&ServicePosition%5B0%5D%5BformulaValueMultiplier%5D={faktor}"
f"&ServicePosition%5B0%5D%5BformulaValueLength%5D={meter}"
f"&ServicePosition%5B0%5D%5BlongText%5D={self._url_encode(langtext_pos)}"
f"&ServicePosition%5B0%5D%5Bid%5D={pos_id}"
f"&clientId=1&sheetId={sheet_id}&insertPosition={idx}&save="
)
elif einheit == "M2":
faktor = str(pos.get("faktor", "1")).replace(",", "%2C")
laenge = str(pos.get("laenge", "")).replace(",", "%2C")
breite = str(pos.get("breite", "")).replace(",", "%2C")
post_body = (
f"_csrf={self._url_encode(csrf_value3)}%3D%3D"
f"&ServicePosition%5B0%5D%5BsectionText%5D={self._url_encode(abschnitt)}"
f"&ServicePosition%5B0%5D%5BformulaSymbol%5D=MF"
f"&ServicePosition%5B0%5D%5BformulaValueMultiplier%5D={faktor}"
f"&ServicePosition%5B0%5D%5BformulaValueLength%5D={laenge}"
f"&ServicePosition%5B0%5D%5BformulaValueWidth%5D={breite}"
f"&ServicePosition%5B0%5D%5BlongText%5D={self._url_encode(langtext_pos)}"
f"&ServicePosition%5B0%5D%5Bid%5D={pos_id}"
f"&clientId=1&sheetId={sheet_id}&insertPosition={idx}&save="
)
elif einheit == "M3":
faktor = str(pos.get("faktor", "1")).replace(",", "%2C")
laenge = str(pos.get("laenge", "")).replace(",", "%2C")
breite = str(pos.get("breite", "")).replace(",", "%2C")
tiefe = str(pos.get("tiefe", "")).replace(",", "%2C")
post_body = (
f"_csrf={self._url_encode(csrf_value3)}%3D%3D"
f"&ServicePosition%5B0%5D%5BsectionText%5D={self._url_encode(abschnitt)}"
f"&ServicePosition%5B0%5D%5BformulaSymbol%5D=MV"
f"&ServicePosition%5B0%5D%5BformulaValueMultiplier%5D={faktor}"
f"&ServicePosition%5B0%5D%5BformulaValueLength%5D={laenge}"
f"&ServicePosition%5B0%5D%5BformulaValueWidth%5D={breite}"
f"&ServicePosition%5B0%5D%5BformulaValueDepth%5D={tiefe}"
f"&ServicePosition%5B0%5D%5BlongText%5D={self._url_encode(langtext_pos)}"
f"&ServicePosition%5B0%5D%5Bid%5D={pos_id}"
f"&clientId=1&sheetId={sheet_id}&insertPosition={idx}&save="
)
elif einheit in ("STD", "LE"):
stueck = str(pos.get("menge", "")).replace(",", "%2C")
post_body = (
f"_csrf={self._url_encode(csrf_value3)}%3D%3D"
f"&ServicePosition%5B0%5D%5BsectionText%5D={self._url_encode(abschnitt)}"
f"&ServicePosition%5B0%5D%5Bquantity%5D={stueck}"
f"&ServicePosition%5B0%5D%5BlongText%5D={self._url_encode(langtext_pos)}"
f"&ServicePosition%5B0%5D%5Bid%5D={pos_id}"
f"&clientId=1&sheetId={sheet_id}&insertPosition={idx}&save="
)
else:
ergebnisse.append({"pos_nr": pos_nr, "status": "error", "msg": f"Unbekannte Einheit: {einheit}"})
continue
result_html = self._post(
f"{EV_HOST}/sheet-position/create?insertPosition={idx}&positionId={pos_id}&c=1&sheetId={sheet_id}",
post_body,
f"{EV_HOST}/sheet-position/create?insertPosition=0&positionId={pos_id}&c=1&sheetId={sheet_id}"
)
if "erfolgreich gespeichert" in result_html:
ergebnisse.append({"pos_nr": pos_nr, "status": "ok", "msg": "eingetragen"})
else:
ergebnisse.append({"pos_nr": pos_nr, "status": "error", "msg": "Fehlgeschlagen"})
time.sleep(0.5)
return ergebnisse