db_query / panel_app /trafic_analysis_panel.py
DavMelchi's picture
Add KPI Health Check Panel with multi-RAT analysis, persistent degradation detection, and Excel export functionality
a8899bd
import io
import os
import sys
import zipfile
from datetime import date, timedelta
import numpy as np
import pandas as pd
import panel as pn
import plotly.express as px
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from panel_app.convert_to_excel_panel import write_dfs_to_excel
from utils.utils_vars import get_physical_db
pn.extension(
"plotly",
"tabulator",
raw_css=[
":fullscreen { background-color: white; overflow: auto; }",
"::backdrop { background-color: white; }",
".plot-fullscreen-wrapper:fullscreen { padding: 20px; display: flex; flex-direction: column; }",
".plot-fullscreen-wrapper:fullscreen > * { height: 100% !important; width: 100% !important; }",
],
)
def read_fileinput_to_df(file_input: pn.widgets.FileInput) -> pd.DataFrame | None:
"""Read a Panel FileInput (ZIP or CSV) into a DataFrame.
Returns None if no file is provided.
"""
if file_input is None or not file_input.value:
return None
filename = (file_input.filename or "").lower()
data = io.BytesIO(file_input.value)
if filename.endswith(".zip"):
with zipfile.ZipFile(data) as z:
csv_files = [f for f in z.namelist() if f.lower().endswith(".csv")]
if not csv_files:
raise ValueError("No CSV file found in the ZIP archive")
with z.open(csv_files[0]) as f:
return pd.read_csv(f, encoding="latin1", sep=";", low_memory=False)
elif filename.endswith(".csv"):
return pd.read_csv(data, encoding="latin1", sep=";", low_memory=False)
else:
raise ValueError("Unsupported file format. Please upload a ZIP or CSV file.")
def extract_code(name):
name = name.replace(" ", "_") if isinstance(name, str) else None
if name and len(name) >= 10:
try:
return int(name.split("_")[0])
except ValueError:
return None
return None
def preprocess_2g(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["BCF name"].str.len() >= 10].copy()
df["2g_data_trafic"] = ((df["TRAFFIC_PS DL"] + df["PS_UL_Load"]) / 1000).round(1)
df.rename(columns={"2G_Carried Traffic": "2g_voice_trafic"}, inplace=True)
df["code"] = df["BCF name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
if "TCH availability ratio" in df.columns:
df["2g_tch_avail"] = pd.to_numeric(
df["TCH availability ratio"], errors="coerce"
)
agg_dict = {
"2g_data_trafic": "sum",
"2g_voice_trafic": "sum",
}
if "2g_tch_avail" in df.columns:
agg_dict["2g_tch_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
def preprocess_3g(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["WBTS name"].str.len() >= 10].copy()
df["code"] = df["WBTS name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
df.rename(
columns={
"Total CS traffic - Erl": "3g_voice_trafic",
"Total_Data_Traffic": "3g_data_trafic",
},
inplace=True,
)
kpi_col = None
for col in df.columns:
if "cell availability" in str(col).lower():
kpi_col = col
break
if kpi_col is not None:
df["3g_cell_avail"] = pd.to_numeric(df[kpi_col], errors="coerce")
agg_dict = {
"3g_voice_trafic": "sum",
"3g_data_trafic": "sum",
}
if "3g_cell_avail" in df.columns:
agg_dict["3g_cell_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
def preprocess_lte(df: pd.DataFrame) -> pd.DataFrame:
df = df[df["LNBTS name"].str.len() >= 10].copy()
df["lte_data_trafic"] = (
df["4G/LTE DL Traffic Volume (GBytes)"]
+ df["4G/LTE UL Traffic Volume (GBytes)"]
)
df["code"] = df["LNBTS name"].apply(extract_code)
df["code"] = pd.to_numeric(df["code"], errors="coerce")
df = df[df["code"].notna()]
df["code"] = df["code"].astype(int)
date_format = (
"%m.%d.%Y %H:%M:%S" if len(df["PERIOD_START_TIME"].iat[0]) > 10 else "%m.%d.%Y"
)
df["date"] = pd.to_datetime(df["PERIOD_START_TIME"], format=date_format)
df["ID"] = df["date"].astype(str) + "_" + df["code"].astype(str)
if "Cell Avail excl BLU" in df.columns:
df["lte_cell_avail"] = pd.to_numeric(df["Cell Avail excl BLU"], errors="coerce")
agg_dict = {"lte_data_trafic": "sum"}
if "lte_cell_avail" in df.columns:
agg_dict["lte_cell_avail"] = "mean"
df = df.groupby(["date", "ID", "code"], as_index=False).agg(agg_dict)
return df
def merge_and_compare(df_2g, df_3g, df_lte, pre_range, post_range, last_period_range):
physical_db = get_physical_db()
physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0]
physical_db["code"] = (
pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int)
)
physical_db = physical_db[["code", "Longitude", "Latitude", "City"]]
physical_db = physical_db.drop_duplicates(subset="code")
df = pd.merge(df_2g, df_3g, on=["date", "ID", "code"], how="outer")
df = pd.merge(df, df_lte, on=["date", "ID", "code"], how="outer")
for col in [
"2g_data_trafic",
"2g_voice_trafic",
"3g_voice_trafic",
"3g_data_trafic",
"lte_data_trafic",
]:
if col not in df:
df[col] = 0
kpi_masks = {}
for kpi_col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]:
if kpi_col in df.columns:
kpi_masks[kpi_col] = df[kpi_col].notna()
df.fillna(0, inplace=True)
for kpi_col, mask in kpi_masks.items():
df.loc[~mask, kpi_col] = np.nan
df["total_voice_trafic"] = df["2g_voice_trafic"] + df["3g_voice_trafic"]
df["total_data_trafic"] = (
df["2g_data_trafic"] + df["3g_data_trafic"] + df["lte_data_trafic"]
)
df = pd.merge(df, physical_db, on=["code"], how="left")
pre_start, pre_end = pd.to_datetime(pre_range[0]), pd.to_datetime(pre_range[1])
post_start, post_end = pd.to_datetime(post_range[0]), pd.to_datetime(post_range[1])
last_period_start, last_period_end = pd.to_datetime(
last_period_range[0]
), pd.to_datetime(last_period_range[1])
last_period = df[
(df["date"] >= last_period_start) & (df["date"] <= last_period_end)
]
def assign_period(x):
if pre_start <= x <= pre_end:
return "pre"
if post_start <= x <= post_end:
return "post"
return "other"
df["period"] = df["date"].apply(assign_period)
comparison = df[df["period"].isin(["pre", "post"])]
sum_pivot = (
comparison.groupby(["code", "period"])[
["total_voice_trafic", "total_data_trafic"]
]
.sum()
.unstack()
)
sum_pivot.columns = [f"{metric}_{period}" for metric, period in sum_pivot.columns]
sum_pivot = sum_pivot.reset_index()
sum_pivot["total_voice_trafic_diff"] = (
sum_pivot["total_voice_trafic_post"] - sum_pivot["total_voice_trafic_pre"]
)
sum_pivot["total_data_trafic_diff"] = (
sum_pivot["total_data_trafic_post"] - sum_pivot["total_data_trafic_pre"]
)
for metric in ["total_voice_trafic", "total_data_trafic"]:
sum_pivot[f"{metric}_diff_pct"] = (
(sum_pivot.get(f"{metric}_post", 0) - sum_pivot.get(f"{metric}_pre", 0))
/ sum_pivot.get(f"{metric}_pre", 1)
) * 100
sum_order = [
"code",
"total_voice_trafic_pre",
"total_voice_trafic_post",
"total_voice_trafic_diff",
"total_voice_trafic_diff_pct",
"total_data_trafic_pre",
"total_data_trafic_post",
"total_data_trafic_diff",
"total_data_trafic_diff_pct",
]
sum_existing_cols = [col for col in sum_order if col in sum_pivot.columns]
sum_remaining_cols = [
col for col in sum_pivot.columns if col not in sum_existing_cols
]
sum_pivot = sum_pivot[sum_existing_cols + sum_remaining_cols]
avg_pivot = (
comparison.groupby(["code", "period"])[
["total_voice_trafic", "total_data_trafic"]
]
.mean()
.unstack()
)
avg_pivot.columns = [f"{metric}_{period}" for metric, period in avg_pivot.columns]
avg_pivot = avg_pivot.reset_index()
avg_pivot["total_voice_trafic_diff"] = (
avg_pivot["total_voice_trafic_post"] - avg_pivot["total_voice_trafic_pre"]
)
avg_pivot["total_data_trafic_diff"] = (
avg_pivot["total_data_trafic_post"] - avg_pivot["total_data_trafic_pre"]
)
for metric in ["total_voice_trafic", "total_data_trafic"]:
avg_pivot[f"{metric}_diff_pct"] = (
(avg_pivot.get(f"{metric}_post", 0) - avg_pivot.get(f"{metric}_pre", 0))
/ avg_pivot.get(f"{metric}_pre", 1)
) * 100
avg_pivot = avg_pivot.rename(
columns={
"total_voice_trafic_pre": "avg_voice_trafic_pre",
"total_voice_trafic_post": "avg_voice_trafic_post",
"total_voice_trafic_diff": "avg_voice_trafic_diff",
"total_voice_trafic_diff_pct": "avg_voice_trafic_diff_pct",
"total_data_trafic_pre": "avg_data_trafic_pre",
"total_data_trafic_post": "avg_data_trafic_post",
"total_data_trafic_diff": "avg_data_trafic_diff",
"total_data_trafic_diff_pct": "avg_data_trafic_diff_pct",
}
)
avg_order = [
"code",
"avg_voice_trafic_pre",
"avg_voice_trafic_post",
"avg_voice_trafic_diff",
"avg_voice_trafic_diff_pct",
"avg_data_trafic_pre",
"avg_data_trafic_post",
"avg_data_trafic_diff",
"avg_data_trafic_diff_pct",
]
avg_existing_cols = [col for col in avg_order if col in avg_pivot.columns]
avg_remaining_cols = [
col for col in avg_pivot.columns if col not in avg_existing_cols
]
avg_pivot = avg_pivot[avg_existing_cols + avg_remaining_cols]
return df, last_period, sum_pivot.round(2), avg_pivot.round(2)
def analyze_2g_availability(df: pd.DataFrame, sla_2g: float):
avail_col = "2g_tch_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_2g = df[df[avail_col].notna()].copy()
df_2g = df_2g[df_2g["period"].isin(["pre", "post"])]
if df_2g.empty:
return None, None
site_pivot = df_2g.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "tch_avail_pre", "post": "tch_avail_post"}
)
if "tch_avail_pre" not in site_pivot.columns:
site_pivot["tch_avail_pre"] = pd.NA
if "tch_avail_post" not in site_pivot.columns:
site_pivot["tch_avail_post"] = pd.NA
site_pivot["tch_avail_diff"] = (
site_pivot["tch_avail_post"] - site_pivot["tch_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["tch_avail_pre"] >= sla_2g
site_pivot["post_ok_vs_sla"] = site_pivot["tch_avail_post"] >= sla_2g
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "tch_avail_pre"),
("post", "tch_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_2g).sum()
cells_lt_sla = (series < sla_2g).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_3g_availability(df: pd.DataFrame, sla_3g: float):
avail_col = "3g_cell_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_3g = df[df[avail_col].notna()].copy()
df_3g = df_3g[df_3g["period"].isin(["pre", "post"])]
if df_3g.empty:
return None, None
site_pivot = df_3g.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "cell_avail_pre", "post": "cell_avail_post"}
)
if "cell_avail_pre" not in site_pivot.columns:
site_pivot["cell_avail_pre"] = pd.NA
if "cell_avail_post" not in site_pivot.columns:
site_pivot["cell_avail_post"] = pd.NA
site_pivot["cell_avail_diff"] = (
site_pivot["cell_avail_post"] - site_pivot["cell_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["cell_avail_pre"] >= sla_3g
site_pivot["post_ok_vs_sla"] = site_pivot["cell_avail_post"] >= sla_3g
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "cell_avail_pre"),
("post", "cell_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_3g).sum()
cells_lt_sla = (series < sla_3g).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_lte_availability(df: pd.DataFrame, sla_lte: float):
avail_col = "lte_cell_avail"
if avail_col not in df.columns or "period" not in df.columns:
return None, None
df_lte = df[df[avail_col].notna()].copy()
df_lte = df_lte[df_lte["period"].isin(["pre", "post"])]
if df_lte.empty:
return None, None
site_pivot = df_lte.groupby(["code", "period"])[avail_col].mean().unstack()
site_pivot = site_pivot.rename(
columns={"pre": "lte_avail_pre", "post": "lte_avail_post"}
)
if "lte_avail_pre" not in site_pivot.columns:
site_pivot["lte_avail_pre"] = pd.NA
if "lte_avail_post" not in site_pivot.columns:
site_pivot["lte_avail_post"] = pd.NA
site_pivot["lte_avail_diff"] = (
site_pivot["lte_avail_post"] - site_pivot["lte_avail_pre"]
)
site_pivot["pre_ok_vs_sla"] = site_pivot["lte_avail_pre"] >= sla_lte
site_pivot["post_ok_vs_sla"] = site_pivot["lte_avail_post"] >= sla_lte
site_pivot = site_pivot.reset_index()
summary_rows = []
for period_label, col_name in [
("pre", "lte_avail_pre"),
("post", "lte_avail_post"),
]:
series = site_pivot[col_name].dropna()
total_cells = series.shape[0]
if total_cells == 0:
summary_rows.append(
{
"period": period_label,
"cells": 0,
"avg_availability": pd.NA,
"median_availability": pd.NA,
"p05_availability": pd.NA,
"p95_availability": pd.NA,
"min_availability": pd.NA,
"max_availability": pd.NA,
"cells_ge_sla": 0,
"cells_lt_sla": 0,
"pct_cells_ge_sla": pd.NA,
}
)
continue
cells_ge_sla = (series >= sla_lte).sum()
cells_lt_sla = (series < sla_lte).sum()
summary_rows.append(
{
"period": period_label,
"cells": int(total_cells),
"avg_availability": series.mean(),
"median_availability": series.median(),
"p05_availability": series.quantile(0.05),
"p95_availability": series.quantile(0.95),
"min_availability": series.min(),
"max_availability": series.max(),
"cells_ge_sla": int(cells_ge_sla),
"cells_lt_sla": int(cells_lt_sla),
"pct_cells_ge_sla": cells_ge_sla / total_cells * 100,
}
)
summary_df = pd.DataFrame(summary_rows)
return summary_df, site_pivot
def analyze_multirat_availability(
df: pd.DataFrame, sla_2g: float, sla_3g: float, sla_lte: float
):
if "period" not in df.columns:
return None
rat_cols = []
if "2g_tch_avail" in df.columns:
rat_cols.append("2g_tch_avail")
if "3g_cell_avail" in df.columns:
rat_cols.append("3g_cell_avail")
if "lte_cell_avail" in df.columns:
rat_cols.append("lte_cell_avail")
if not rat_cols:
return None
agg_dict = {col: "mean" for col in rat_cols}
df_pre = df[df["period"] == "pre"]
df_post = df[df["period"] == "post"]
pre = df_pre.groupby("code", as_index=False).agg(agg_dict)
post = df_post.groupby("code", as_index=False).agg(agg_dict)
rename_map_pre = {
"2g_tch_avail": "2g_avail_pre",
"3g_cell_avail": "3g_avail_pre",
"lte_cell_avail": "lte_avail_pre",
}
rename_map_post = {
"2g_tch_avail": "2g_avail_post",
"3g_cell_avail": "3g_avail_post",
"lte_cell_avail": "lte_avail_post",
}
pre = pre.rename(columns=rename_map_pre)
post = post.rename(columns=rename_map_post)
multi = pd.merge(pre, post, on="code", how="outer")
if not df_post.empty and {
"total_voice_trafic",
"total_data_trafic",
}.issubset(df_post.columns):
post_traffic = (
df_post.groupby("code", as_index=False)[
["total_voice_trafic", "total_data_trafic"]
]
.sum()
.rename(
columns={
"total_voice_trafic": "post_total_voice_trafic",
"total_data_trafic": "post_total_data_trafic",
}
)
)
multi = pd.merge(multi, post_traffic, on="code", how="left")
if "City" in df.columns:
city_df = df[["code", "City"]].drop_duplicates("code")
multi = pd.merge(multi, city_df, on="code", how="left")
def _ok_flag(series: pd.Series, sla: float) -> pd.Series:
if series.name not in multi.columns:
return pd.Series([pd.NA] * len(multi), index=multi.index)
ok = multi[series.name] >= sla
ok = ok.where(multi[series.name].notna(), pd.NA)
return ok
if "2g_avail_post" in multi.columns:
multi["ok_2g_post"] = _ok_flag(multi["2g_avail_post"], sla_2g)
if "3g_avail_post" in multi.columns:
multi["ok_3g_post"] = _ok_flag(multi["3g_avail_post"], sla_3g)
if "lte_avail_post" in multi.columns:
multi["ok_lte_post"] = _ok_flag(multi["lte_avail_post"], sla_lte)
def classify_row(row):
rats_status = []
for rat, col in [
("2G", "ok_2g_post"),
("3G", "ok_3g_post"),
("LTE", "ok_lte_post"),
]:
if col in row and not pd.isna(row[col]):
rats_status.append((rat, bool(row[col])))
if not rats_status:
return "No RAT data"
bad_rats = [rat for rat, ok in rats_status if not ok]
if not bad_rats:
return "OK all RAT"
if len(bad_rats) == 1:
return f"Degraded {bad_rats[0]} only"
return "Degraded multi-RAT (" + ",".join(bad_rats) + ")"
multi["post_multirat_status"] = multi.apply(classify_row, axis=1)
ordered_cols = ["code"]
if "City" in multi.columns:
ordered_cols.append("City")
for col in [
"2g_avail_pre",
"2g_avail_post",
"3g_avail_pre",
"3g_avail_post",
"lte_avail_pre",
"lte_avail_post",
"post_total_voice_trafic",
"post_total_data_trafic",
"ok_2g_post",
"ok_3g_post",
"ok_lte_post",
"post_multirat_status",
]:
if col in multi.columns:
ordered_cols.append(col)
remaining_cols = [c for c in multi.columns if c not in ordered_cols]
multi = multi[ordered_cols + remaining_cols]
return multi
def analyze_persistent_availability(
df: pd.DataFrame,
multi_rat_df: pd.DataFrame,
sla_2g: float,
sla_3g: float,
sla_lte: float,
min_consecutive_days: int = 3,
) -> pd.DataFrame:
if df is None or df.empty:
return pd.DataFrame()
if "date" not in df.columns or "code" not in df.columns:
return pd.DataFrame()
work_df = df.copy()
work_df["date_only"] = work_df["date"].dt.date
site_stats = {}
def _update_stats(rat_key_prefix: str, grouped: pd.DataFrame, sla: float) -> None:
if grouped.empty:
return
for code, group in grouped.groupby("code"):
group = group.sort_values("date_only")
dates = pd.to_datetime(group["date_only"]).tolist()
below_flags = (group["value"] < sla).tolist()
max_streak = 0
current_streak = 0
total_below = 0
last_date = None
for flag, current_date in zip(below_flags, dates):
if flag:
total_below += 1
if (
last_date is not None
and current_date == last_date + timedelta(days=1)
and current_streak > 0
):
current_streak += 1
else:
current_streak = 1
if current_streak > max_streak:
max_streak = current_streak
else:
current_streak = 0
last_date = current_date
stats = site_stats.setdefault(
code,
{
"code": code,
"max_streak_2g": 0,
"max_streak_3g": 0,
"max_streak_lte": 0,
"below_days_2g": 0,
"below_days_3g": 0,
"below_days_lte": 0,
},
)
stats[f"max_streak_{rat_key_prefix}"] = max_streak
stats[f"below_days_{rat_key_prefix}"] = total_below
for rat_col, rat_key, sla in [
("2g_tch_avail", "2g", sla_2g),
("3g_cell_avail", "3g", sla_3g),
("lte_cell_avail", "lte", sla_lte),
]:
if rat_col in work_df.columns:
g = (
work_df.dropna(subset=[rat_col])
.groupby(["code", "date_only"])[rat_col]
.mean()
.reset_index()
)
g = g.rename(columns={rat_col: "value"})
_update_stats(rat_key, g, sla)
if not site_stats:
return pd.DataFrame()
rows = []
for code, s in site_stats.items():
max_2g = s.get("max_streak_2g", 0)
max_3g = s.get("max_streak_3g", 0)
max_lte = s.get("max_streak_lte", 0)
below_2g = s.get("below_days_2g", 0)
below_3g = s.get("below_days_3g", 0)
below_lte = s.get("below_days_lte", 0)
persistent_2g = max_2g >= min_consecutive_days if max_2g else False
persistent_3g = max_3g >= min_consecutive_days if max_3g else False
persistent_lte = max_lte >= min_consecutive_days if max_lte else False
total_below_any = below_2g + below_3g + below_lte
persistent_any = persistent_2g or persistent_3g or persistent_lte
rats_persistent_count = sum(
[persistent_2g is True, persistent_3g is True, persistent_lte is True]
)
rows.append(
{
"code": code,
"persistent_issue_2g": persistent_2g,
"persistent_issue_3g": persistent_3g,
"persistent_issue_lte": persistent_lte,
"max_consecutive_days_2g": max_2g,
"max_consecutive_days_3g": max_3g,
"max_consecutive_days_lte": max_lte,
"total_below_days_2g": below_2g,
"total_below_days_3g": below_3g,
"total_below_days_lte": below_lte,
"total_below_days_any": total_below_any,
"persistent_issue_any": persistent_any,
"persistent_rats_count": rats_persistent_count,
}
)
result = pd.DataFrame(rows)
result = result[result["persistent_issue_any"] == True]
if result.empty:
return result
if multi_rat_df is not None and not multi_rat_df.empty:
cols_to_merge = [
c
for c in [
"code",
"City",
"post_total_voice_trafic",
"post_total_data_trafic",
"post_multirat_status",
]
if c in multi_rat_df.columns
]
if cols_to_merge:
result = pd.merge(
result,
multi_rat_df[cols_to_merge].drop_duplicates("code"),
on="code",
how="left",
)
if "post_total_data_trafic" not in result.columns:
result["post_total_data_trafic"] = 0.0
result["criticity_score"] = (
result["post_total_data_trafic"].fillna(0) * 1.0
+ result["total_below_days_any"].fillna(0) * 100.0
+ result["persistent_rats_count"].fillna(0) * 1000.0
)
result = result.sort_values(
by=["criticity_score", "total_below_days_any"], ascending=[False, False]
)
return result
def monthly_data_analysis(df: pd.DataFrame):
df["date"] = pd.to_datetime(df["date"])
df["month_year"] = df["date"].dt.to_period("M").astype(str)
voice_trafic = df.pivot_table(
index="code",
columns="month_year",
values="total_voice_trafic",
aggfunc="sum",
fill_value=0,
)
voice_trafic = voice_trafic.reindex(sorted(voice_trafic.columns), axis=1)
data_trafic = df.pivot_table(
index="code",
columns="month_year",
values="total_data_trafic",
aggfunc="sum",
fill_value=0,
)
data_trafic = data_trafic.reindex(sorted(data_trafic.columns), axis=1)
return voice_trafic, data_trafic
# --------------------------------------------------------------------------------------
# Global state for drill-down views & export
# --------------------------------------------------------------------------------------
current_full_df: pd.DataFrame | None = None
current_last_period_df: pd.DataFrame | None = None
current_analysis_df: pd.DataFrame | None = None
current_analysis_last_period_df: pd.DataFrame | None = None
current_multi_rat_df: pd.DataFrame | None = None
current_persistent_df: pd.DataFrame | None = None
current_site_2g_avail: pd.DataFrame | None = None
current_site_3g_avail: pd.DataFrame | None = None
current_site_lte_avail: pd.DataFrame | None = None
current_summary_2g_avail: pd.DataFrame | None = None
current_summary_3g_avail: pd.DataFrame | None = None
current_summary_lte_avail: pd.DataFrame | None = None
current_monthly_voice_df: pd.DataFrame | None = None
current_monthly_data_df: pd.DataFrame | None = None
current_sum_pre_post_df: pd.DataFrame | None = None
current_avg_pre_post_df: pd.DataFrame | None = None
current_availability_summary_all_df: pd.DataFrame | None = None
current_export_multi_rat_df: pd.DataFrame | None = None
current_export_persistent_df: pd.DataFrame | None = None
current_export_bytes: bytes | None = None
# --------------------------------------------------------------------------------------
# Widgets
# --------------------------------------------------------------------------------------
PLOTLY_CONFIG = {"displaylogo": False, "scrollZoom": True, "displayModeBar": True}
file_2g = pn.widgets.FileInput(name="2G Traffic Report", accept=".csv,.zip")
file_3g = pn.widgets.FileInput(name="3G Traffic Report", accept=".csv,.zip")
file_lte = pn.widgets.FileInput(name="LTE Traffic Report", accept=".csv,.zip")
pre_range = pn.widgets.DateRangePicker(name="Pre-period (from - to)")
post_range = pn.widgets.DateRangePicker(name="Post-period (from - to)")
last_range = pn.widgets.DateRangePicker(name="Last period (from - to)")
sla_2g = pn.widgets.FloatInput(name="2G TCH availability SLA (%)", value=98.0, step=0.1)
sla_3g = pn.widgets.FloatInput(
name="3G Cell availability SLA (%)", value=98.0, step=0.1
)
sla_lte = pn.widgets.FloatInput(
name="LTE Cell availability SLA (%)", value=98.0, step=0.1
)
number_of_top_trafic_sites = pn.widgets.IntInput(
name="Number of top traffic sites", value=25
)
min_persistent_days_widget = pn.widgets.IntInput(
name="Minimum consecutive days below SLA to flag persistent issue",
value=3,
)
top_critical_n_widget = pn.widgets.IntInput(
name="Number of top critical sites to display", value=25
)
run_button = pn.widgets.Button(name="Run analysis", button_type="primary")
status_pane = pn.pane.Alert(
"Upload the 3 reports, select the 3 periods and click 'Run analysis'",
alert_type="primary",
)
summary_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
sum_pre_post_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
summary_2g_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
worst_2g_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
summary_3g_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
worst_3g_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
summary_lte_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
worst_lte_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
multi_rat_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
persistent_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
site_select = pn.widgets.AutocompleteInput(
name="Select a site for detailed view (Type to search)",
options={},
case_sensitive=False,
search_strategy="includes",
restrict=True,
placeholder="Type site code or city...",
)
site_traffic_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-site-traffic"],
)
site_traffic_plot = pn.Column(
site_traffic_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "site-traffic-wrapper"],
)
site_avail_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-site-avail"],
)
site_avail_plot = pn.Column(
site_avail_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "site-avail-wrapper"],
)
site_degraded_table = pn.widgets.Tabulator(
height=200,
sizing_mode="stretch_width",
layout="fit_data_table",
)
city_select = pn.widgets.AutocompleteInput(
name="Select a City for aggregated view (Type to search)",
options=[],
case_sensitive=False,
search_strategy="includes",
restrict=True,
placeholder="Type city name...",
)
city_traffic_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-city-traffic"],
)
city_traffic_plot = pn.Column(
city_traffic_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "city-traffic-wrapper"],
)
city_avail_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-city-avail"],
)
city_avail_plot = pn.Column(
city_avail_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "city-avail-wrapper"],
)
city_degraded_table = pn.widgets.Tabulator(
height=200,
sizing_mode="stretch_width",
layout="fit_data_table",
)
daily_avail_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-daily-avail"],
)
daily_avail_plot = pn.Column(
daily_avail_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "daily-avail-wrapper"],
)
daily_degraded_table = pn.widgets.Tabulator(
height=200,
sizing_mode="stretch_width",
layout="fit_data_table",
)
top_data_sites_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
top_voice_sites_table = pn.widgets.Tabulator(
height=250,
sizing_mode="stretch_width",
layout="fit_data_table",
)
top_data_bar_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-top-data"],
)
top_data_bar_plot = pn.Column(
top_data_bar_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "top-data-bar-wrapper"],
)
top_voice_bar_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-top-voice"],
)
top_voice_bar_plot = pn.Column(
top_voice_bar_plot_pane,
height=400,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "top-voice-bar-wrapper"],
)
data_map_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-data-map"],
)
data_map_plot = pn.Column(
data_map_plot_pane,
height=500,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "data-map-wrapper"],
)
voice_map_plot_pane = pn.pane.Plotly(
sizing_mode="stretch_both",
config=PLOTLY_CONFIG,
css_classes=["fullscreen-target-voice-map"],
)
voice_map_plot = pn.Column(
voice_map_plot_pane,
height=500,
sizing_mode="stretch_width",
css_classes=["plot-fullscreen-wrapper", "voice-map-wrapper"],
)
# Fullscreen helper logic has been replaced by client-side JS.
# Fullscreen buttons for each Plotly plot
site_traffic_fullscreen_btn = pn.widgets.Button(
name="Full screen site traffic", button_type="default"
)
site_avail_fullscreen_btn = pn.widgets.Button(
name="Full screen site availability", button_type="default"
)
city_traffic_fullscreen_btn = pn.widgets.Button(
name="Full screen city traffic", button_type="default"
)
city_avail_fullscreen_btn = pn.widgets.Button(
name="Full screen city availability", button_type="default"
)
daily_avail_fullscreen_btn = pn.widgets.Button(
name="Full screen daily availability", button_type="default"
)
top_data_fullscreen_btn = pn.widgets.Button(
name="Full screen top data bar", button_type="default"
)
top_voice_fullscreen_btn = pn.widgets.Button(
name="Full screen top voice bar", button_type="default"
)
data_map_fullscreen_btn = pn.widgets.Button(
name="Full screen data map", button_type="default"
)
voice_map_fullscreen_btn = pn.widgets.Button(
name="Full screen voice map", button_type="default"
)
multi_rat_download = pn.widgets.FileDownload(
label="Download Multi-RAT table (CSV)",
filename="multi_rat_availability.csv",
button_type="default",
)
persistent_download = pn.widgets.FileDownload(
label="Download persistent issues (CSV)",
filename="persistent_issues.csv",
button_type="default",
)
top_data_download = pn.widgets.FileDownload(
label="Download top data sites (CSV)",
filename="top_data_sites.csv",
button_type="default",
)
top_voice_download = pn.widgets.FileDownload(
label="Download top voice sites (CSV)",
filename="top_voice_sites.csv",
button_type="default",
)
export_button = pn.widgets.FileDownload(
label="Download the Analysis Report",
filename="Global_Trafic_Analysis_Report.xlsx",
button_type="primary",
)
# --------------------------------------------------------------------------------------
# Callback
# --------------------------------------------------------------------------------------
def _validate_date_range(rng: tuple[date, date] | list[date], label: str) -> None:
if not rng or len(rng) != 2:
raise ValueError(f"Please select 2 dates for {label}.")
if rng[0] is None or rng[1] is None:
raise ValueError(f"Please select valid dates for {label}.")
def run_analysis(event=None): # event param required by on_click
try:
status_pane.object = "Running analysis..."
status_pane.alert_type = "primary"
global current_full_df, current_last_period_df
global current_analysis_df, current_analysis_last_period_df
global current_multi_rat_df, current_persistent_df
global current_site_2g_avail, current_site_3g_avail, current_site_lte_avail
global current_summary_2g_avail, current_summary_3g_avail, current_summary_lte_avail
global current_monthly_voice_df, current_monthly_data_df
global current_sum_pre_post_df, current_avg_pre_post_df
global current_availability_summary_all_df
global current_export_multi_rat_df, current_export_persistent_df
global current_export_bytes
# Basic validations
if not (file_2g.value and file_3g.value and file_lte.value):
raise ValueError("Please upload all 3 traffic reports (2G, 3G, LTE).")
_validate_date_range(pre_range.value, "pre-period")
_validate_date_range(post_range.value, "post-period")
_validate_date_range(last_range.value, "last period")
# Simple check on overlapping pre/post (same logic as Streamlit version, but lighter)
pre_start, pre_end = pre_range.value
post_start, post_end = post_range.value
if pre_start == post_start and pre_end == post_end:
raise ValueError("Pre and post periods are the same.")
if pre_start < post_start and pre_end > post_end:
raise ValueError("Pre and post periods are overlapping.")
df_2g = read_fileinput_to_df(file_2g)
df_3g = read_fileinput_to_df(file_3g)
df_lte = read_fileinput_to_df(file_lte)
if df_2g is None or df_3g is None or df_lte is None:
raise ValueError("Failed to read one or more input files.")
summary = pd.DataFrame(
{
"Dataset": ["2G", "3G", "LTE"],
"Rows": [len(df_2g), len(df_3g), len(df_lte)],
"Columns": [df_2g.shape[1], df_3g.shape[1], df_lte.shape[1]],
}
)
summary_table.value = summary
df_2g_clean = preprocess_2g(df_2g)
df_3g_clean = preprocess_3g(df_3g)
df_lte_clean = preprocess_lte(df_lte)
full_df, last_period, sum_pre_post_analysis, avg_pre_post_analysis = (
merge_and_compare(
df_2g_clean,
df_3g_clean,
df_lte_clean,
pre_range.value,
post_range.value,
last_range.value,
)
)
monthly_voice_df, monthly_data_df = monthly_data_analysis(full_df)
analysis_df = full_df
# Persist global state for later drill-down / export
current_full_df = full_df
current_last_period_df = last_period
current_analysis_df = analysis_df
current_analysis_last_period_df = last_period
current_monthly_voice_df = monthly_voice_df
current_monthly_data_df = monthly_data_df
current_sum_pre_post_df = sum_pre_post_analysis
current_avg_pre_post_df = avg_pre_post_analysis
sum_pre_post_table.value = sum_pre_post_analysis
summary_2g_avail, site_2g_avail = analyze_2g_availability(
analysis_df, float(sla_2g.value)
)
if summary_2g_avail is not None:
summary_2g_table.value = summary_2g_avail.round(2)
worst_sites_2g = site_2g_avail.sort_values("tch_avail_post").head(25)
worst_2g_table.value = worst_sites_2g.round(2)
else:
summary_2g_table.value = pd.DataFrame()
worst_2g_table.value = pd.DataFrame()
current_summary_2g_avail = summary_2g_avail
current_site_2g_avail = site_2g_avail if summary_2g_avail is not None else None
summary_3g_avail, site_3g_avail = analyze_3g_availability(
analysis_df, float(sla_3g.value)
)
if summary_3g_avail is not None:
summary_3g_table.value = summary_3g_avail.round(2)
worst_sites_3g = site_3g_avail.sort_values("cell_avail_post").head(25)
worst_3g_table.value = worst_sites_3g.round(2)
else:
summary_3g_table.value = pd.DataFrame()
worst_3g_table.value = pd.DataFrame()
current_summary_3g_avail = summary_3g_avail
current_site_3g_avail = site_3g_avail if summary_3g_avail is not None else None
summary_lte_avail, site_lte_avail = analyze_lte_availability(
analysis_df, float(sla_lte.value)
)
if summary_lte_avail is not None:
summary_lte_table.value = summary_lte_avail.round(2)
worst_sites_lte = site_lte_avail.sort_values("lte_avail_post").head(25)
worst_lte_table.value = worst_sites_lte.round(2)
else:
summary_lte_table.value = pd.DataFrame()
worst_lte_table.value = pd.DataFrame()
current_summary_lte_avail = summary_lte_avail
current_site_lte_avail = (
site_lte_avail if summary_lte_avail is not None else None
)
# Build availability summary across RATs for export
availability_frames = []
if summary_2g_avail is not None:
tmp = summary_2g_avail.copy()
tmp["RAT"] = "2G"
availability_frames.append(tmp)
if summary_3g_avail is not None:
tmp = summary_3g_avail.copy()
tmp["RAT"] = "3G"
availability_frames.append(tmp)
if summary_lte_avail is not None:
tmp = summary_lte_avail.copy()
tmp["RAT"] = "LTE"
availability_frames.append(tmp)
current_availability_summary_all_df = (
pd.concat(availability_frames, ignore_index=True)
if availability_frames
else pd.DataFrame()
)
multi_rat_df = analyze_multirat_availability(
analysis_df,
float(sla_2g.value),
float(sla_3g.value),
float(sla_lte.value),
)
if multi_rat_df is not None:
multi_rat_table.value = multi_rat_df.round(2)
else:
multi_rat_table.value = pd.DataFrame()
current_multi_rat_df = multi_rat_df if multi_rat_df is not None else None
# Persistent availability (UI uses configurable threshold, export keeps 3 days)
persistent_df = pd.DataFrame()
if multi_rat_df is not None:
persistent_df = analyze_persistent_availability(
analysis_df,
multi_rat_df,
float(sla_2g.value),
float(sla_3g.value),
float(sla_lte.value),
int(min_persistent_days_widget.value),
)
current_persistent_df = (
persistent_df
if persistent_df is not None and not persistent_df.empty
else None
)
# Export-specific multi-RAT & persistent (based on full_df as in Streamlit app)
export_multi_rat_base = analyze_multirat_availability(
full_df,
float(sla_2g.value),
float(sla_3g.value),
float(sla_lte.value),
)
current_export_multi_rat_df = (
export_multi_rat_base
if export_multi_rat_base is not None
else pd.DataFrame()
)
export_persistent_tmp = pd.DataFrame()
if export_multi_rat_base is not None:
export_persistent_tmp = analyze_persistent_availability(
full_df,
export_multi_rat_base,
float(sla_2g.value),
float(sla_3g.value),
float(sla_lte.value),
3,
)
current_export_persistent_df = (
export_persistent_tmp
if export_persistent_tmp is not None and not export_persistent_tmp.empty
else pd.DataFrame()
)
# Precompute export bytes so the download button is instant
current_export_bytes = _build_export_bytes()
# Update all drill-down & map views
_update_site_controls()
_update_city_controls()
_update_daily_availability_view()
_update_top_sites_and_maps()
_update_persistent_table_view()
status_pane.alert_type = "success"
status_pane.object = "Analysis completed."
except Exception as exc: # noqa: BLE001
status_pane.alert_type = "danger"
status_pane.object = f"Error: {exc}"
run_button.on_click(run_analysis)
def _update_site_controls() -> None:
"""Populate site selection widget based on current_analysis_df and refresh view."""
if current_analysis_df is None or current_analysis_df.empty:
site_select.options = {}
site_select.value = None
site_traffic_plot_pane.object = None
site_avail_plot_pane.object = None
site_degraded_table.value = pd.DataFrame()
return
sites_df = (
current_analysis_df[["code", "City"]]
.drop_duplicates()
.sort_values(by=["City", "code"])
)
options: dict[str, int] = {}
for _, row in sites_df.iterrows():
label = (
f"{row['City']}_{row['code']}"
if pd.notna(row["City"])
else str(row["code"])
)
options[label] = int(row["code"])
site_select.options = options
if options and site_select.value not in options.values():
# When options is a dict, Select.value is the mapped value (code)
site_select.value = next(iter(options.values()))
_update_site_view()
def _update_site_view(event=None) -> None: # noqa: D401, ARG001
"""Update site drill-down plots and table from current_analysis_df and site_select."""
if current_analysis_df is None or current_analysis_df.empty:
site_traffic_plot_pane.object = None
site_avail_plot_pane.object = None
site_degraded_table.value = pd.DataFrame()
return
selected_code = site_select.value
if selected_code is None:
site_traffic_plot_pane.object = None
site_avail_plot_pane.object = None
site_degraded_table.value = pd.DataFrame()
return
site_detail_df = current_analysis_df[
current_analysis_df["code"] == int(selected_code)
].copy()
if site_detail_df.empty:
site_traffic_plot_pane.object = None
site_avail_plot_pane.object = None
site_degraded_table.value = pd.DataFrame()
return
site_detail_df = site_detail_df.sort_values("date")
# Traffic over time
traffic_cols = [
col
for col in ["total_voice_trafic", "total_data_trafic"]
if col in site_detail_df.columns
]
first_row = site_detail_df.iloc[0]
site_label = f"{first_row['code']}"
if pd.notna(first_row.get("City")):
site_label += f" ({first_row['City']})"
if traffic_cols:
traffic_long = site_detail_df[["date"] + traffic_cols].melt(
id_vars="date",
value_vars=traffic_cols,
var_name="metric",
value_name="value",
)
fig_traffic = px.line(
traffic_long,
x="date",
y="value",
color="metric",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_traffic.update_layout(
title=f"Traffic Evolution - Site: {site_label}",
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
site_traffic_plot_pane.object = fig_traffic
else:
site_traffic_plot_pane.object = None
# Availability over time per RAT
avail_cols: list[str] = []
rename_map: dict[str, str] = {}
if "2g_tch_avail" in site_detail_df.columns:
avail_cols.append("2g_tch_avail")
rename_map["2g_tch_avail"] = "2G"
if "3g_cell_avail" in site_detail_df.columns:
avail_cols.append("3g_cell_avail")
rename_map["3g_cell_avail"] = "3G"
if "lte_cell_avail" in site_detail_df.columns:
avail_cols.append("lte_cell_avail")
rename_map["lte_cell_avail"] = "LTE"
if avail_cols:
avail_df = site_detail_df[["date"] + avail_cols].copy()
avail_df = avail_df.rename(columns=rename_map)
value_cols = [c for c in avail_df.columns if c != "date"]
avail_long = avail_df.melt(
id_vars="date",
value_vars=value_cols,
var_name="RAT",
value_name="availability",
)
fig_avail = px.line(
avail_long,
x="date",
y="availability",
color="RAT",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_avail.update_layout(
title=f"Availability vs SLA - Site: {site_label}",
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
site_avail_plot_pane.object = fig_avail
# Days with availability below SLA per RAT
site_detail_df["date_only"] = site_detail_df["date"].dt.date
degraded_rows_site: list[dict] = []
for rat_col, rat_name, sla_value in [
("2g_tch_avail", "2G", float(sla_2g.value)),
("3g_cell_avail", "3G", float(sla_3g.value)),
("lte_cell_avail", "LTE", float(sla_lte.value)),
]:
if rat_col in site_detail_df.columns:
daily_site = (
site_detail_df.groupby("date_only")[rat_col].mean().dropna()
)
mask = daily_site < sla_value
for d, val in daily_site[mask].items():
degraded_rows_site.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows_site:
degraded_site_df = pd.DataFrame(degraded_rows_site)
site_degraded_table.value = degraded_site_df.round(2)
else:
site_degraded_table.value = pd.DataFrame()
else:
site_avail_plot_pane.object = None
site_degraded_table.value = pd.DataFrame()
def _update_city_controls() -> None:
"""Populate city selection widget based on current_analysis_df and refresh view."""
if current_analysis_df is None or current_analysis_df.empty:
city_select.options = []
city_select.value = None
city_traffic_plot_pane.object = None
city_avail_plot_pane.object = None
city_degraded_table.value = pd.DataFrame()
return
if (
"City" not in current_analysis_df.columns
or not current_analysis_df["City"].notna().any()
):
city_select.options = []
city_select.value = None
city_traffic_plot_pane.object = None
city_avail_plot_pane.object = pd.DataFrame()
city_degraded_table.value = pd.DataFrame()
return
cities_df = (
current_analysis_df[["City"]].dropna().drop_duplicates().sort_values(by="City")
)
options = cities_df["City"].tolist()
city_select.options = options
if options and city_select.value not in options:
city_select.value = options[0]
_update_city_view()
def _update_city_view(event=None) -> None: # noqa: D401, ARG001
"""Update city drill-down plots and degraded days table based on city_select."""
if current_analysis_df is None or current_analysis_df.empty:
city_traffic_plot_pane.object = None
city_avail_plot_pane.object = None
city_degraded_table.value = pd.DataFrame()
return
selected_city = city_select.value
if not selected_city:
city_traffic_plot_pane.object = None
city_avail_plot_pane.object = None
city_degraded_table.value = pd.DataFrame()
return
city_detail_df = current_analysis_df[
current_analysis_df["City"] == selected_city
].copy()
if city_detail_df.empty:
city_traffic_plot_pane.object = None
city_avail_plot_pane.object = None
city_degraded_table.value = pd.DataFrame()
return
city_detail_df = city_detail_df.sort_values("date")
# Traffic aggregated at city level
traffic_cols_city = [
col
for col in ["total_voice_trafic", "total_data_trafic"]
if col in city_detail_df.columns
]
if traffic_cols_city:
city_traffic = (
city_detail_df.groupby("date")[traffic_cols_city].sum().reset_index()
)
traffic_long_city = city_traffic.melt(
id_vars="date",
value_vars=traffic_cols_city,
var_name="metric",
value_name="value",
)
fig_traffic_city = px.line(
traffic_long_city,
x="date",
y="value",
color="metric",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_traffic_city.update_layout(
title=f"Total Traffic Evolution - City: {selected_city}",
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
city_traffic_plot_pane.object = fig_traffic_city
else:
city_traffic_plot_pane.object = None
# Availability aggregated at city level
avail_cols_city: list[str] = []
rename_map_city: dict[str, str] = {}
if "2g_tch_avail" in city_detail_df.columns:
avail_cols_city.append("2g_tch_avail")
rename_map_city["2g_tch_avail"] = "2G"
if "3g_cell_avail" in city_detail_df.columns:
avail_cols_city.append("3g_cell_avail")
rename_map_city["3g_cell_avail"] = "3G"
if "lte_cell_avail" in city_detail_df.columns:
avail_cols_city.append("lte_cell_avail")
rename_map_city["lte_cell_avail"] = "LTE"
if avail_cols_city:
avail_city_df = city_detail_df[["date"] + avail_cols_city].copy()
avail_city_df = avail_city_df.rename(columns=rename_map_city)
value_cols_city = [c for c in avail_city_df.columns if c != "date"]
avail_long_city = avail_city_df.melt(
id_vars="date",
value_vars=value_cols_city,
var_name="RAT",
value_name="availability",
)
fig_avail_city = px.line(
avail_long_city,
x="date",
y="availability",
color="RAT",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_avail_city.update_layout(
title=f"Availability vs SLA - City: {selected_city}",
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
city_avail_plot_pane.object = fig_avail_city
city_detail_df["date_only"] = city_detail_df["date"].dt.date
degraded_rows_city: list[dict] = []
for rat_col, rat_name, sla_value in [
("2g_tch_avail", "2G", float(sla_2g.value)),
("3g_cell_avail", "3G", float(sla_3g.value)),
("lte_cell_avail", "LTE", float(sla_lte.value)),
]:
if rat_col in city_detail_df.columns:
daily_city = (
city_detail_df.groupby("date_only")[rat_col].mean().dropna()
)
mask_city = daily_city < sla_value
for d, val in daily_city[mask_city].items():
degraded_rows_city.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows_city:
degraded_city_df = pd.DataFrame(degraded_rows_city)
city_degraded_table.value = degraded_city_df.round(2)
else:
city_degraded_table.value = pd.DataFrame()
else:
city_avail_plot_pane.object = None
city_degraded_table.value = pd.DataFrame()
def _update_daily_availability_view() -> None:
"""Daily average availability per RAT over the full analysis_df."""
if current_analysis_df is None or current_analysis_df.empty:
daily_avail_plot_pane.object = None
daily_degraded_table.value = pd.DataFrame()
return
temp_df = current_analysis_df.copy()
if not any(
col in temp_df.columns
for col in ["2g_tch_avail", "3g_cell_avail", "lte_cell_avail"]
):
daily_avail_plot_pane.object = None
daily_degraded_table.value = pd.DataFrame()
return
temp_df["date_only"] = temp_df["date"].dt.date
agg_dict: dict[str, str] = {}
if "2g_tch_avail" in temp_df.columns:
agg_dict["2g_tch_avail"] = "mean"
if "3g_cell_avail" in temp_df.columns:
agg_dict["3g_cell_avail"] = "mean"
if "lte_cell_avail" in temp_df.columns:
agg_dict["lte_cell_avail"] = "mean"
daily_avail = (
temp_df.groupby("date_only", as_index=False).agg(agg_dict)
if agg_dict
else pd.DataFrame()
)
if daily_avail.empty:
daily_avail_plot_pane.object = None
daily_degraded_table.value = pd.DataFrame()
return
rename_map: dict[str, str] = {}
if "2g_tch_avail" in daily_avail.columns:
rename_map["2g_tch_avail"] = "2G"
if "3g_cell_avail" in daily_avail.columns:
rename_map["3g_cell_avail"] = "3G"
if "lte_cell_avail" in daily_avail.columns:
rename_map["lte_cell_avail"] = "LTE"
daily_avail = daily_avail.rename(columns=rename_map)
value_cols = [c for c in daily_avail.columns if c != "date_only"]
if not value_cols:
daily_avail_plot_pane.object = None
daily_degraded_table.value = pd.DataFrame()
return
daily_melt = daily_avail.melt(
id_vars="date_only",
value_vars=value_cols,
var_name="RAT",
value_name="availability",
)
fig = px.line(
daily_melt,
x="date_only",
y="availability",
color="RAT",
markers=True,
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig.update_layout(
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
daily_avail_plot_pane.object = fig
degraded_rows: list[dict] = []
for rat_name, sla_value in [
("2G", float(sla_2g.value)),
("3G", float(sla_3g.value)),
("LTE", float(sla_lte.value)),
]:
if rat_name in daily_avail.columns:
series = daily_avail[rat_name]
mask = series < sla_value
for d, val in zip(daily_avail.loc[mask, "date_only"], series[mask]):
degraded_rows.append(
{
"RAT": rat_name,
"date": d,
"avg_availability": val,
"SLA": sla_value,
}
)
if degraded_rows:
degraded_df = pd.DataFrame(degraded_rows)
daily_degraded_table.value = degraded_df.round(2)
else:
daily_degraded_table.value = pd.DataFrame()
def _update_top_sites_and_maps() -> None:
"""Top traffic sites and geographic maps based on last analysis period."""
if current_analysis_last_period_df is None or current_analysis_last_period_df.empty:
top_data_sites_table.value = pd.DataFrame()
top_voice_sites_table.value = pd.DataFrame()
top_data_bar_plot_pane.object = None
top_voice_bar_plot_pane.object = None
data_map_plot_pane.object = None
voice_map_plot_pane.object = None
return
df = current_analysis_last_period_df
n = int(number_of_top_trafic_sites.value or 25)
# Top sites by data traffic
top_sites = (
df.groupby(["code", "City"])["total_data_trafic"]
.sum()
.sort_values(ascending=False)
.head(n)
)
top_data_sites_table.value = top_sites.sort_values(ascending=True).reset_index()
fig_data = px.bar(
top_sites.reset_index(),
y=top_sites.reset_index()[["City", "code"]].agg(
lambda x: "_".join(map(str, x)), axis=1
),
x="total_data_trafic",
title=f"Top {n} sites by data traffic",
orientation="h",
text="total_data_trafic",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_data.update_layout(
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
top_data_bar_plot_pane.object = fig_data
# Top sites by voice traffic
top_sites_voice = (
df.groupby(["code", "City"])["total_voice_trafic"]
.sum()
.sort_values(ascending=False)
.head(n)
)
top_voice_sites_table.value = top_sites_voice.sort_values(
ascending=True
).reset_index()
fig_voice = px.bar(
top_sites_voice.reset_index(),
y=top_sites_voice.reset_index()[["City", "code"]].agg(
lambda x: "_".join(map(str, x)), axis=1
),
x="total_voice_trafic",
title=f"Top {n} sites by voice traffic",
orientation="h",
text="total_voice_trafic",
color_discrete_sequence=px.colors.qualitative.Plotly,
)
fig_voice.update_layout(
template="plotly_white",
plot_bgcolor="white",
paper_bgcolor="white",
)
top_voice_bar_plot_pane.object = fig_voice
# Maps
if {"Latitude", "Longitude"}.issubset(df.columns):
min_size = 5
max_size = 40
# Data traffic map
df_data = (
df.groupby(["code", "City", "Latitude", "Longitude"])["total_data_trafic"]
.sum()
.reset_index()
)
if not df_data.empty:
traffic_data_min = df_data["total_data_trafic"].min()
traffic_data_max = df_data["total_data_trafic"].max()
if traffic_data_max > traffic_data_min:
df_data["bubble_size"] = df_data["total_data_trafic"].apply(
lambda x: min_size
+ (max_size - min_size)
* (x - traffic_data_min)
/ (traffic_data_max - traffic_data_min)
)
else:
df_data["bubble_size"] = min_size
custom_blue_red = [
[0.0, "#4292c6"],
[0.2, "#2171b5"],
[0.4, "#084594"],
[0.6, "#cb181d"],
[0.8, "#a50f15"],
[1.0, "#67000d"],
]
fig_map_data = px.scatter_map(
df_data,
lat="Latitude",
lon="Longitude",
color="total_data_trafic",
size="bubble_size",
color_continuous_scale=custom_blue_red,
size_max=max_size,
zoom=10,
height=600,
title="Data traffic distribution",
hover_data={"code": True, "total_data_trafic": True},
hover_name="code",
text=[str(x) for x in df_data["code"]],
)
fig_map_data.update_layout(
mapbox_style="open-street-map",
coloraxis_colorbar=dict(title="Total Data Traffic (MB)"),
coloraxis=dict(cmin=traffic_data_min, cmax=traffic_data_max),
font=dict(size=10, color="black"),
)
data_map_plot_pane.object = fig_map_data
else:
data_map_plot_pane.object = None
# Voice traffic map
df_voice = (
df.groupby(["code", "City", "Latitude", "Longitude"])["total_voice_trafic"]
.sum()
.reset_index()
)
if not df_voice.empty:
traffic_voice_min = df_voice["total_voice_trafic"].min()
traffic_voice_max = df_voice["total_voice_trafic"].max()
if traffic_voice_max > traffic_voice_min:
df_voice["bubble_size"] = df_voice["total_voice_trafic"].apply(
lambda x: min_size
+ (max_size - min_size)
* (x - traffic_voice_min)
/ (traffic_voice_max - traffic_voice_min)
)
else:
df_voice["bubble_size"] = min_size
custom_blue_red = [
[0.0, "#4292c6"],
[0.2, "#2171b5"],
[0.4, "#084594"],
[0.6, "#cb181d"],
[0.8, "#a50f15"],
[1.0, "#67000d"],
]
fig_map_voice = px.scatter_map(
df_voice,
lat="Latitude",
lon="Longitude",
color="total_voice_trafic",
size="bubble_size",
color_continuous_scale=custom_blue_red,
size_max=max_size,
zoom=10,
height=600,
title="Voice traffic distribution",
hover_data={"code": True, "total_voice_trafic": True},
hover_name="code",
text=[str(x) for x in df_voice["code"]],
)
fig_map_voice.update_layout(
mapbox_style="open-street-map",
coloraxis_colorbar=dict(title="Total Voice Traffic (MB)"),
coloraxis=dict(cmin=traffic_voice_min, cmax=traffic_voice_max),
font=dict(size=10, color="black"),
)
voice_map_plot_pane.object = fig_map_voice
else:
voice_map_plot_pane.object = None
else:
data_map_plot_pane.object = None
voice_map_plot_pane.object = None
def _update_persistent_table_view(event=None) -> None: # noqa: D401, ARG001
"""Update persistent issues table based on current_persistent_df and top_critical_n."""
if current_persistent_df is None or current_persistent_df.empty:
persistent_table.value = pd.DataFrame()
return
n = int(top_critical_n_widget.value or 25)
persistent_table.value = current_persistent_df.head(n).round(2)
def _recompute_persistent_from_widget(event=None) -> None: # noqa: ARG001
"""Recompute persistent issues when the minimum consecutive days widget changes."""
global current_persistent_df
if (
current_analysis_df is None
or current_analysis_df.empty
or current_multi_rat_df is None
or current_multi_rat_df.empty
):
current_persistent_df = None
persistent_table.value = pd.DataFrame()
return
persistent_df = analyze_persistent_availability(
current_analysis_df,
current_multi_rat_df,
float(sla_2g.value),
float(sla_3g.value),
float(sla_lte.value),
int(min_persistent_days_widget.value),
)
current_persistent_df = (
persistent_df if persistent_df is not None and not persistent_df.empty else None
)
_update_persistent_table_view()
def _build_export_bytes() -> bytes:
"""Build Excel report bytes mirroring the Streamlit export structure."""
if current_full_df is None:
return b""
dfs: list[pd.DataFrame] = [
current_full_df,
(
current_sum_pre_post_df
if current_sum_pre_post_df is not None
else pd.DataFrame()
),
(
current_avg_pre_post_df
if current_avg_pre_post_df is not None
else pd.DataFrame()
),
(
current_monthly_voice_df
if current_monthly_voice_df is not None
else pd.DataFrame()
),
(
current_monthly_data_df
if current_monthly_data_df is not None
else pd.DataFrame()
),
(
current_availability_summary_all_df
if current_availability_summary_all_df is not None
else pd.DataFrame()
),
current_site_2g_avail if current_site_2g_avail is not None else pd.DataFrame(),
current_site_3g_avail if current_site_3g_avail is not None else pd.DataFrame(),
(
current_site_lte_avail
if current_site_lte_avail is not None
else pd.DataFrame()
),
(
current_export_multi_rat_df
if current_export_multi_rat_df is not None
else pd.DataFrame()
),
(
current_export_persistent_df
if current_export_persistent_df is not None
else pd.DataFrame()
),
]
sheet_names = [
"Global_Trafic_Analysis",
"Sum_pre_post_analysis",
"Avg_pre_post_analysis",
"Monthly_voice_analysis",
"Monthly_data_analysis",
"Availability_Summary_All_RAT",
"TwoG_Availability_By_Site",
"ThreeG_Availability_By_Site",
"LTE_Availability_By_Site",
"MultiRAT_Availability_By_Site",
"Top_Critical_Sites",
]
return write_dfs_to_excel(dfs, sheet_names, index=True)
def _export_callback() -> bytes:
# Use cached bytes from the last completed analysis to make download instant
data = current_export_bytes or b""
if not data:
return io.BytesIO()
# FileDownload expects a file path or file-like object, not raw bytes
return io.BytesIO(data)
def _df_to_csv_bytes(df: pd.DataFrame | None) -> io.BytesIO:
if df is None or getattr(df, "empty", True): # handles None and empty DataFrame
return io.BytesIO()
return io.BytesIO(df.to_csv(index=False).encode("utf-8"))
def _download_multi_rat_table() -> io.BytesIO:
value = getattr(multi_rat_table, "value", None)
return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None)
def _download_persistent_table() -> io.BytesIO:
value = getattr(persistent_table, "value", None)
return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None)
def _download_top_data_sites() -> io.BytesIO:
value = getattr(top_data_sites_table, "value", None)
return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None)
def _download_top_voice_sites() -> io.BytesIO:
value = getattr(top_voice_sites_table, "value", None)
return _df_to_csv_bytes(value if isinstance(value, pd.DataFrame) else None)
# Client-side Fullscreen JS logic
# We target the specific CSS class assigned to each plot pane.
# Client-side Fullscreen JS logic with Shadow DOM support
_JS_FULLSCREEN = """
function findDeep(root, cls) {
if (!root) return null;
if (root.classList && root.classList.contains(cls)) return root;
if (root.shadowRoot) {
var found = findDeep(root.shadowRoot, cls);
if (found) return found;
}
var children = root.children;
if (children) {
for (var i = 0; i < children.length; i++) {
var found = findDeep(children[i], cls);
if (found) return found;
}
}
return null;
}
var el = findDeep(document.body, target_class);
if (el) {
if (el.requestFullscreen) {
el.requestFullscreen();
} else if (el.webkitRequestFullscreen) {
el.webkitRequestFullscreen();
} else if (el.msRequestFullscreen) {
el.msRequestFullscreen();
}
} else {
// Debug info
alert("Impossible de passer en plein écran : élément '" + target_class + "' introuvable même après recherche approfondie (Shadow DOM).");
}
"""
# Reactive bindings for drill-down controls & export
site_select.param.watch(_update_site_view, "value")
city_select.param.watch(_update_city_view, "value")
top_critical_n_widget.param.watch(_update_persistent_table_view, "value")
number_of_top_trafic_sites.param.watch(_update_top_sites_and_maps, "value")
min_persistent_days_widget.param.watch(_recompute_persistent_from_widget, "value")
export_button.callback = _export_callback
multi_rat_download.callback = _download_multi_rat_table
persistent_download.callback = _download_persistent_table
top_data_download.callback = _download_top_data_sites
top_voice_download.callback = _download_top_voice_sites
site_traffic_fullscreen_btn.js_on_click(
args={"target_class": "site-traffic-wrapper"},
code=_JS_FULLSCREEN,
)
site_avail_fullscreen_btn.js_on_click(
args={"target_class": "site-avail-wrapper"},
code=_JS_FULLSCREEN,
)
city_traffic_fullscreen_btn.js_on_click(
args={"target_class": "city-traffic-wrapper"},
code=_JS_FULLSCREEN,
)
city_avail_fullscreen_btn.js_on_click(
args={"target_class": "city-avail-wrapper"},
code=_JS_FULLSCREEN,
)
daily_avail_fullscreen_btn.js_on_click(
args={"target_class": "daily-avail-wrapper"},
code=_JS_FULLSCREEN,
)
top_data_fullscreen_btn.js_on_click(
args={"target_class": "top-data-bar-wrapper"},
code=_JS_FULLSCREEN,
)
top_voice_fullscreen_btn.js_on_click(
args={
"target_class": "top-voice-bar-wrapper",
},
code=_JS_FULLSCREEN,
)
data_map_fullscreen_btn.js_on_click(
args={"target_class": "data-map-wrapper"},
code=_JS_FULLSCREEN,
)
voice_map_fullscreen_btn.js_on_click(
args={"target_class": "voice-map-wrapper"},
code=_JS_FULLSCREEN,
)
# --------------------------------------------------------------------------------------
# Material Template layout
# --------------------------------------------------------------------------------------
template = pn.template.MaterialTemplate(
title="📊 Global Trafic Analysis - Panel (2G / 3G / LTE)",
)
# Ensure the template modal is large enough for fullscreen charts
# Modal CSS override removed as we switched to native fullscreen.
sidebar_content = pn.Column(
"""This Panel app is a migration of the existing Streamlit-based global traffic analysis.
Upload the 3 traffic reports (2G / 3G / LTE), configure the analysis periods and SLAs, then run the analysis.
In this first step, the app only validates the pipeline and shows a lightweight summary of the inputs.\nFull KPIs and visualizations will be added progressively.""",
"---",
file_2g,
file_3g,
file_lte,
"---",
pre_range,
post_range,
last_range,
"---",
sla_2g,
sla_3g,
sla_lte,
"---",
number_of_top_trafic_sites,
min_persistent_days_widget,
top_critical_n_widget,
"---",
run_button,
)
main_content = pn.Column(
status_pane,
pn.pane.Markdown("## Input datasets summary"),
summary_table,
pn.layout.Divider(),
pn.pane.Markdown("## Summary Analysis Pre / Post"),
sum_pre_post_table,
pn.layout.Divider(),
pn.pane.Markdown("## Availability vs SLA (per RAT)"),
pn.Tabs(
(
"2G",
pn.Column(
summary_2g_table, pn.pane.Markdown("Worst 25 sites"), worst_2g_table
),
),
(
"3G",
pn.Column(
summary_3g_table, pn.pane.Markdown("Worst 25 sites"), worst_3g_table
),
),
(
"LTE",
pn.Column(
summary_lte_table, pn.pane.Markdown("Worst 25 sites"), worst_lte_table
),
),
),
pn.layout.Divider(),
pn.pane.Markdown("## Multi-RAT Availability (post-period)"),
multi_rat_table,
multi_rat_download,
pn.layout.Divider(),
pn.pane.Markdown("## Persistent availability issues (critical sites)"),
persistent_table,
persistent_download,
pn.layout.Divider(),
pn.pane.Markdown("## Site drill-down: traffic and availability over time"),
site_select,
site_traffic_plot,
site_traffic_fullscreen_btn,
site_avail_plot,
site_avail_fullscreen_btn,
site_degraded_table,
pn.layout.Divider(),
pn.pane.Markdown("## City drill-down: traffic and availability over time"),
city_select,
city_traffic_plot,
city_traffic_fullscreen_btn,
city_avail_plot,
city_avail_fullscreen_btn,
city_degraded_table,
pn.layout.Divider(),
pn.pane.Markdown("## Daily average availability per RAT"),
daily_avail_plot,
daily_avail_fullscreen_btn,
daily_degraded_table,
pn.layout.Divider(),
pn.pane.Markdown("## Top traffic sites and geographic maps (last period)"),
pn.Row(
pn.Column(
pn.pane.Markdown("### Top sites by data traffic"),
top_data_sites_table,
top_data_download,
top_data_bar_plot,
top_data_fullscreen_btn,
),
pn.Column(
pn.pane.Markdown("### Top sites by voice traffic"),
top_voice_sites_table,
top_voice_download,
top_voice_bar_plot,
top_voice_fullscreen_btn,
),
),
pn.Row(
pn.Column(
pn.pane.Markdown("### Data traffic map"),
data_map_plot,
data_map_fullscreen_btn,
),
pn.Column(
pn.pane.Markdown("### Voice traffic map"),
voice_map_plot,
voice_map_fullscreen_btn,
),
),
pn.layout.Divider(),
pn.pane.Markdown("## Export"),
export_button,
)
def get_page_components():
return sidebar_content, main_content
if __name__ == "__main__":
template.sidebar.append(sidebar_content)
template.main.append(main_content)
template.servable()