import io import re from typing import Optional import pandas as pd def _parse_int(value: object) -> Optional[int]: v = pd.to_numeric(value, errors="coerce") if pd.isna(v): return None return int(v) def _strip_suffix(value: str, suffix: str) -> str: if value.endswith(suffix): return value[: -len(suffix)] return value def _base_site_name_from_enb_name(enb_name: object) -> str: if not isinstance(enb_name, str): return "" s = enb_name.strip() s = _strip_suffix(s, "_4G") s = _strip_suffix(s, "_4g") return s def _band_from_cell_name(cell_name: object) -> str: if not isinstance(cell_name, str): return "" s = cell_name.upper() for band in ["L800", "L1800", "L2600", "L2300", "L700"]: if f"_{band}" in s or s.endswith(band): return band return "" def _sector_from_cell_name(cell_name: object) -> Optional[int]: if not isinstance(cell_name, str): return None m = re.search(r"_(\d+)_L\d+\b", cell_name.upper()) if not m: return None try: return int(m.group(1)) except ValueError: return None def _dl_mimo_mode_for_band(band: str) -> str: if band in {"L2600"}: return "4x4" return "2x2" def _ul_earfcn_from_dl(dl_earfcn: int) -> int: return int(dl_earfcn) + 18000 def _build_output_columns(band_blocks: list[str]) -> list[str]: cols = [ "Unique Site ID", "Config", "Site Name", "$mrbtsid", "$lnbtsid", "$enbname", "$mcc", "$mnc", ] cell_idx = 1 for band_block_idx, _ in enumerate(band_blocks, start=1): for _slot in range(4): cols.extend( [ f"$lncelname{cell_idx}", f"$Eutracelid{cell_idx}", f"$pci{cell_idx}", f"$rsi{cell_idx}", f"$ltemaxpower{cell_idx}", ] ) cell_idx += 1 cols.extend( [ f"$tac{band_block_idx}", f"$dlMimoMode{band_block_idx}", f"$ChBw{band_block_idx}", f"$dlearfcnlte{band_block_idx}", f"$ulearfcnlte{band_block_idx}", ] ) return cols def read_ciq_4g_brut(ciq_file) -> pd.DataFrame: if hasattr(ciq_file, "seek"): ciq_file.seek(0) df = pd.read_excel(ciq_file, engine="calamine") df.columns = df.columns.astype(str).str.strip() required = [ "eNodeBName", "CellName", "DlEarfcn", "eNodeB Id", "Local Cell ID", "TAC", "Physical cell ID", "Root sequence index", ] missing = [c for c in required if c not in df.columns] if missing: raise ValueError(f"CIQ 4G brut is missing required columns: {missing}") return df def generate_ciq_4g_sheet( ciq_df: pd.DataFrame, year_suffix: str, bands: str, mcc: int, mnc: int, band_blocks: Optional[list[str]] = None, ch_bw: int = 20, lte_max_power: int = 460, ) -> pd.DataFrame: if band_blocks is None: band_blocks = ["L800", "L1800", "L2600"] output_cols = _build_output_columns(band_blocks) rows_out: list[list[object]] = [] for enb_id_raw, site_rows in ciq_df.groupby("eNodeB Id", dropna=False): enb_id = _parse_int(enb_id_raw) if enb_id is None: continue enb_name = str(site_rows["eNodeBName"].dropna().iloc[0]).strip() base_site = _base_site_name_from_enb_name(enb_name) site_name = f"{base_site}_{year_suffix}_{bands}_NA" enbname_out = f"{enb_name}_NA" row_map: dict[str, object] = { "Unique Site ID": enb_id, "Config": bands, "Site Name": site_name, "$mrbtsid": int(10000 + enb_id), "$lnbtsid": enb_id, "$enbname": enbname_out, "$mcc": int(mcc), "$mnc": str(int(mnc)).zfill(2), } cell_slot_idx = 1 for block_idx, band in enumerate(band_blocks, start=1): sub = site_rows[ site_rows["CellName"].apply(_band_from_cell_name) == band ].copy() sub["_sector"] = sub["CellName"].apply(_sector_from_cell_name) sub = sub.sort_values(by=["_sector", "Local Cell ID"], na_position="last") for slot in range(4): if slot < len(sub): r = sub.iloc[slot] cell_name = str(r.get("CellName")).strip() dl_earfcn = _parse_int(r.get("DlEarfcn")) local_cell_id = _parse_int(r.get("Local Cell ID")) pci = _parse_int(r.get("Physical cell ID")) rsi = _parse_int(r.get("Root sequence index")) row_map[f"$lncelname{cell_slot_idx}"] = f"{cell_name}_NA" row_map[f"$Eutracelid{cell_slot_idx}"] = local_cell_id row_map[f"$pci{cell_slot_idx}"] = pci row_map[f"$rsi{cell_slot_idx}"] = rsi row_map[f"$ltemaxpower{cell_slot_idx}"] = int(lte_max_power) cell_slot_idx += 1 if not sub.empty: tac = _parse_int(sub.iloc[0].get("TAC")) dl_earfcn = _parse_int(sub.iloc[0].get("DlEarfcn")) row_map[f"$tac{block_idx}"] = tac row_map[f"$dlMimoMode{block_idx}"] = _dl_mimo_mode_for_band(band) row_map[f"$ChBw{block_idx}"] = int(ch_bw) row_map[f"$dlearfcnlte{block_idx}"] = dl_earfcn row_map[f"$ulearfcnlte{block_idx}"] = ( _ul_earfcn_from_dl(dl_earfcn) if dl_earfcn is not None else None ) rows_out.append([row_map.get(c) for c in output_cols]) return pd.DataFrame(rows_out, columns=output_cols) def generate_ciq_4g_excel( ciq_file, year_suffix: str = "25", bands: str = "G9G18U9U21L8L18L26", mcc: int = 610, mnc: int = 2, ) -> tuple[dict[str, pd.DataFrame], bytes]: df_in = read_ciq_4g_brut(ciq_file) df_out = generate_ciq_4g_sheet( df_in, year_suffix=year_suffix, bands=bands, mcc=mcc, mnc=mnc, ) sheets: dict[str, pd.DataFrame] = {"CIQ_4G": df_out} bytes_io = io.BytesIO() with pd.ExcelWriter(bytes_io, engine="xlsxwriter") as writer: for sheet_name, df in sheets.items(): df.to_excel(writer, sheet_name=sheet_name, index=False) return sheets, bytes_io.getvalue()