# import re # from pathlib import Path # from datasets import load_dataset, Dataset, DatasetDict, Features, Value, Image # import re # from typing import Dict, List, Optional # from pathlib import Path # from datasets import Dataset, DatasetDict, concatenate_datasets, Features, Value, Sequence # # ------------------------------------------------------------------ # # 0) Load your JSON → `raw_ds` exactly as before # # ------------------------------------------------------------------ # files = [ # "pool_multiple_choice_chunk_01.json", # "pool_multiple_choice_chunk_02.json", # "pool_multiple_choice_chunk_03.json", # "pool_multiple_choice_chunk_04.json", # "pool_numerical_chunk_01.json", # "pool_numerical_chunk_02.json", # "pool_numerical_chunk_03.json", # "pool_regression_chunk_01.json", # ] # # ---- 1-4. load, trim, normalise ---------------------------------------- # def load_trim_normalise(fp, cap=10_000): # ds = Dataset.from_json(fp) # # a) truncate # ds = ds.select(range(min(cap, len(ds)))) # # b) make sure `options` exists and is always list[str] # if "options" not in ds.column_names: # ds = ds.add_column("options", [[]] * len(ds)) # else: # ds = ds.map( # lambda ex: {"options": [str(o) for o in (ex["options"] or [])]}, # remove_columns=[], num_proc=4, # ) # return ds # ds_list = [load_trim_normalise(fp) for fp in files] # # ---- 4. align feature schema explicitly (all files now identical) ------- # common_features = Features({ # "problem_id" : Value("int64"), # "problem" : Value("string"), # "data_type" : Value("string"), # "problem_type": Value("string"), # "options" : Sequence(Value("string")), # "solution" : Value("string"), # "path" : Value("string"), # "data_source" : Value("string"), # }) # ds_list = [d.cast(common_features) for d in ds_list] # # ---- 5. concatenate ----------------------------------------------------- # raw_train = concatenate_datasets(ds_list) # raw_ds = DatasetDict({"train": raw_train}) # # ------------------------------------------------------------------ # # 1) Build the question (unchanged) # # ------------------------------------------------------------------ # def build_question(example): # q = ( # example["problem"] + " Options:\n" + "\n".join(example["options"]) # if example["problem_type"] == "multiple choice" # else example["problem"] # ) # example["problem"] = q # return example # def extract_answer(predict: str) -> Optional[str]: # """ # Extracts the content of the block from `predict`. # Returns the inner text (with leading/trailing whitespace stripped), # or None if no tag is found. # """ # match = re.search(r"([\s\S]*?)", predict, re.DOTALL) # if not match: # return predict # return match.group(1).strip() # def add_answer(example): # # assumes the ground-truth answer (tagged) is in `solution` # example["answer"] = extract_answer(example["solution"]) # return example # # ------------------------------------------------------------------ # # 3) Embed image bytes (column name stays "images") # # ------------------------------------------------------------------ # def to_embedded_image(example): # if example["data_type"] != "image": # example["images"] = None # return example # with open(example["path"], "rb") as f: # img_bytes = f.read() # example["images"] = {"bytes": img_bytes, "path": None} # return example # # ------------------------------------------------------------------ # # 4) Full pipeline # # ------------------------------------------------------------------ # processed = ( # raw_ds["train"] # .map(build_question, num_proc=4) # .map(add_answer, num_proc=4) # .map(to_embedded_image, num_proc=4) # .remove_columns([ # "path", "data_type", "options", "problem_type", "solution", # "problem_id", "data_source" # ← drop these too # ]) # ) # # ------------------------------------------------------------------ # # 5) Schema must match the final column names # # ------------------------------------------------------------------ # features = Features({ # "problem": Value("string"), # "answer" : Value("string"), # "images" : Image(), # keep plural name # }) # processed = processed.cast(features) # # ------------------------------------------------------------------ # # 6) Write Parquet shards (file prefix inside the folder) # # ------------------------------------------------------------------ # out_dir = Path("qwen2.5_vl_portable") # out_dir.mkdir(parents=True, exist_ok=True) # # processed.to_parquet(str(out_dir / "train.parquet")) # → train-00000-of-00001.parquet # processed.to_parquet(str("./hf_data/train.parquet")) # print("✓ Dataset written with embedded images and answers →", out_dir.resolve()) # import re # from pathlib import Path # from typing import Dict, List, Optional # from datasets import ( # Dataset, # DatasetDict, # concatenate_datasets, # Features, # Value, # Sequence, # Image, # ) # # ------------------------------------------------------------------ # # 0) Inputs # # ------------------------------------------------------------------ # files = [ # "pool_multiple_choice_chunk_01.json", # "pool_multiple_choice_chunk_02.json", # "pool_multiple_choice_chunk_03.json", # "pool_multiple_choice_chunk_04.json", # "pool_numerical_chunk_01.json", # "pool_numerical_chunk_02.json", # "pool_numerical_chunk_03.json", # "pool_regression_chunk_01.json", # ] # # ------------------------------------------------------------------ # # 1) Define common meta schema (what you want to keep in the output) # # ------------------------------------------------------------------ # common_features = Features({ # "problem_id" : Value("int64"), # "problem" : Value("string"), # "data_type" : Value("string"), # "problem_type": Value("string"), # "options" : Sequence(Value("string")), # "solution" : Value("string"), # "path" : Value("string"), # "data_source" : Value("string"), # }) # # Final (superset) schema to write: meta + new columns # full_features = common_features.copy() # full_features["answer"] = Value("string") # full_features["images"] = Image() # plural name kept, binary-friendly # # ------------------------------------------------------------------ # # 2) Load + normalize each JSON # # ------------------------------------------------------------------ # def load_trim_normalise(fp: str, cap: int = 10_000) -> Dataset: # ds = Dataset.from_json(fp) # # truncate if desired # ds = ds.select(range(min(cap, len(ds)))) # # ensure `options` exists and is always list[str] # if "options" not in ds.column_names: # ds = ds.add_column("options", [[]] * len(ds)) # else: # ds = ds.map( # lambda ex: {"options": [str(o) for o in (ex["options"] or [])]}, # remove_columns=[], # num_proc=4, # ) # # align to the common meta schema early (helps concat) # # Some JSONs may not have all fields; add missing with defaults first. # missing_cols = [k for k in common_features.keys() if k not in ds.column_names] # for mc in missing_cols: # # create sensible defaults # if mc == "options": # ds = ds.add_column(mc, [[]] * len(ds)) # elif common_features[mc].dtype == "int64": # ds = ds.add_column(mc, [0] * len(ds)) # else: # ds = ds.add_column(mc, [""] * len(ds)) # ds = ds.cast(common_features) # return ds # ds_list = [load_trim_normalise(fp) for fp in files] # # Concatenate shards # raw_train = concatenate_datasets(ds_list) # raw_ds = DatasetDict({"train": raw_train}) # # ------------------------------------------------------------------ # # 3) Processing fns # # ------------------------------------------------------------------ # def build_question(example: Dict) -> Dict: # """ # If multiple-choice, append the options to the text. # Overwrites the `problem` field in-place (kept in output). # """ # if example["problem_type"] == "multiple choice": # opts = example.get("options") or [] # q = example["problem"] + " Options:\n" + "\n".join(opts) # example["problem"] = q # return example # def extract_answer(predict: str) -> Optional[str]: # """ # Return inner text of ..., stripped. # If no tag is found, return the original string. # """ # if predict is None: # return None # match = re.search(r"([\s\S]*?)", predict, re.DOTALL) # if not match: # return predict # return match.group(1).strip() # def add_answer(example: Dict) -> Dict: # example["answer"] = extract_answer(example.get("solution", "")) # return example # def to_embedded_image(example: Dict) -> Dict: # """ # If data_type == 'image', embed bytes for HF Image() feature. # Otherwise leave as None. # """ # if example.get("data_type") != "image": # example["images"] = None # return example # path = example.get("path") # if not path: # example["images"] = None # return example # try: # with open(path, "rb") as f: # img_bytes = f.read() # example["images"] = {"bytes": img_bytes, "path": None} # except Exception: # # If image is missing or unreadable, keep None so cast still works # example["images"] = None # return example # # ------------------------------------------------------------------ # # 4) Apply pipeline (do NOT drop meta columns you want to keep) # # ------------------------------------------------------------------ # processed = ( # raw_ds["train"] # .map(build_question, num_proc=4) # .map(add_answer, num_proc=4) # .map(to_embedded_image, num_proc=4) # .cast(full_features) # <- ensure final schema # ) # # Optional: control output column ordering # processed = processed.select_columns(list(full_features.keys())) # # ------------------------------------------------------------------ # # 5) Write Parquet # # ------------------------------------------------------------------ # out_dir = Path("./hf_data") # out_dir.mkdir(parents=True, exist_ok=True) # out_path = out_dir / "train.parquet" # processed.to_parquet(str(out_path)) # print("✓ Wrote:", out_path.resolve()) # print("Columns:", list(processed.features.keys())) # ------------------------------------------------------------------ # 4.1) Downsample to 30k, mainly reducing math-heavy sources # ------------------------------------------------------------------ from collections import Counter TARGET_SIZE = 30_000 MATH_SHARE = 0.20 # keep ~20% math (tweak if you want) SEED = 2025 # Define which sources are "mathy" MATH_SOURCES = { "Multimath-300k", "TabMWP", "Geometry3K", "CLEVR-Math", "DVQA", "FigureQA", "ChartQA", "PlotQA", "EXAMS-V-train/Mathematics", "UniGeo", "GeoQA+", } def is_math_source(name: Optional[str]) -> bool: if not name: return False return name in MATH_SOURCES or ("math" in name.lower()) # Split math_ds = processed.filter(lambda ex: is_math_source(ex.get("data_source")), num_proc=4) non_math_ds = processed.filter(lambda ex: not is_math_source(ex.get("data_source")), num_proc=4) # Decide quotas non_math_quota = min(len(non_math_ds), int(TARGET_SIZE * (1 - MATH_SHARE))) math_quota = TARGET_SIZE - non_math_quota math_quota = min(math_quota, len(math_ds)) # guard if math is too small # Sample deterministically non_math_sample = non_math_ds.shuffle(seed=SEED).select(range(non_math_quota)) math_sample = math_ds.shuffle(seed=SEED).select(range(math_quota)) # Combine and shuffle final = concatenate_datasets([non_math_sample, math_sample]).shuffle(seed=SEED) # Quick sanity printout cnt = Counter(final["data_source"]) total = len(final) print(f"Final size: {total} (non-math {non_math_quota}, math {math_quota})") for name, n in sorted(cnt.items(), key=lambda x: -x[1])[:25]: pct = n / total print(f"{name:30s} {n:6d} {pct:7.3%}") # Use this 'final' dataset for writing processed = final out_path = out_dir / "train_30k.parquet" processed.to_parquet(str(out_path)) print("✓ Wrote:", out_path.resolve())