import csv
import json
import mmap
import struct
import uuid
from collections import Counter, defaultdict
from pathlib import Path


DUMP_PATH = Path(r"C:\dumps\В памяти много кэшей\rphost.exe_16460.dmp")
OUT_DIR = Path(r"C:\dumps\В памяти много кэшей\uid_memory_stats")

UIDS = {
    "Массив": "51e7a0d2-530b-11d4-b98a-008048da3034",
    "Структура": "4238019d-7e49-4fc9-91db-b6b951d5cf8e",
    "Соответствие": "3d48feae-a9c6-4c5a-a099-9eb6477630c6",
    "СписокЗначений": "4772b3b4-f4a3-49c0-a1a5-8cb5961511a3",
    "ТаблицаЗначений": "acf6192e-81ca-46ef-93a6-5a6968b78663",
    "ДеревоЗначений": "e603c0f2-92fb-4d47-8f38-a44a381cf235",
    "ФиксированныйМассив": "4500381b-db30-4a10-9db4-990038032acf",
    "ФиксированнаяСтруктура": "3ee983d7-ace7-40f9-bb7e-2e916fcddd56",
    "ФиксированноеСоответствие": "220455ea-6c85-4513-996f-bbe79ed07774",
}

STREAM_MEMORY_LIST = 5
STREAM_MEMORY_64_LIST = 9


def read_u32(data, offset):
    return struct.unpack_from("<I", data, offset)[0]


def read_u64(data, offset):
    return struct.unpack_from("<Q", data, offset)[0]


def parse_minidump_ranges(mm):
    if mm[:4] != b"MDMP":
        raise ValueError("Not a minidump file")

    stream_count = read_u32(mm, 8)
    dir_rva = read_u32(mm, 12)
    ranges = []
    streams = []

    for i in range(stream_count):
        entry = dir_rva + i * 12
        stream_type = read_u32(mm, entry)
        data_size = read_u32(mm, entry + 4)
        rva = read_u32(mm, entry + 8)
        streams.append((stream_type, data_size, rva))

        if stream_type == STREAM_MEMORY_LIST:
            count = read_u32(mm, rva)
            p = rva + 4
            for _ in range(count):
                start_va = read_u64(mm, p)
                size = read_u32(mm, p + 8)
                file_rva = read_u32(mm, p + 12)
                ranges.append(
                    {
                        "va": start_va,
                        "file_offset": file_rva,
                        "size": size,
                        "stream": "MemoryList",
                    }
                )
                p += 16

        if stream_type == STREAM_MEMORY_64_LIST:
            count = read_u64(mm, rva)
            base_rva = read_u64(mm, rva + 8)
            p = rva + 16
            current_file = base_rva
            for _ in range(count):
                start_va = read_u64(mm, p)
                size = read_u64(mm, p + 8)
                ranges.append(
                    {
                        "va": start_va,
                        "file_offset": current_file,
                        "size": size,
                        "stream": "Memory64List",
                    }
                )
                current_file += size
                p += 16

    ranges.sort(key=lambda x: x["file_offset"])
    return streams, ranges


def iter_find(mm, needle, start, end):
    pos = start
    while True:
        hit = mm.find(needle, pos, end)
        if hit < 0:
            break
        yield hit
        pos = hit + 1


def build_patterns():
    patterns = []
    for type_name, uid_text in UIDS.items():
        u = uuid.UUID(uid_text)
        variants = {
            "ascii-lower": uid_text.lower().encode("ascii"),
            "ascii-upper": uid_text.upper().encode("ascii"),
            "utf16le-lower": uid_text.lower().encode("utf-16le"),
            "utf16le-upper": uid_text.upper().encode("utf-16le"),
            "uuid-bytes-le": u.bytes_le,
            "uuid-bytes-be": u.bytes,
        }
        for variant, needle in variants.items():
            patterns.append(
                {
                    "type_name": type_name,
                    "uid": uid_text,
                    "variant": variant,
                    "needle": needle,
                }
            )
    return patterns


def scan_ranges(mm, ranges, patterns):
    hits = []
    per_type = Counter()
    per_type_variant = Counter()
    per_range = defaultdict(Counter)
    per_chunk = defaultdict(Counter)

    for r_index, mem in enumerate(ranges):
        start = mem["file_offset"]
        end = start + mem["size"]
        if start < 0 or end > len(mm):
            continue

        for pattern in patterns:
            for file_offset in iter_find(mm, pattern["needle"], start, end):
                va = mem["va"] + (file_offset - start)
                hit = {
                    "type": pattern["type_name"],
                    "uid": pattern["uid"],
                    "variant": pattern["variant"],
                    "file_offset": file_offset,
                    "virtual_address": va,
                    "range_index": r_index,
                    "range_va": mem["va"],
                    "range_size": mem["size"],
                    "range_file_offset": mem["file_offset"],
                }
                hits.append(hit)
                per_type[pattern["type_name"]] += 1
                per_type_variant[(pattern["type_name"], pattern["variant"])] += 1
                per_range[r_index][pattern["type_name"]] += 1
                per_chunk[(r_index, (file_offset - start) // (1024 * 1024))][
                    pattern["type_name"]
                ] += 1

    return hits, per_type, per_type_variant, per_range, per_chunk


def estimate_marker_spans(hits, ranges):
    by_range = defaultdict(list)
    for h in hits:
        if h["variant"] == "uuid-bytes-le":
            by_range[h["range_index"]].append(h)

    span_by_type = Counter()
    span_count = Counter()
    samples = []
    for r_index, items in by_range.items():
        items.sort(key=lambda h: h["file_offset"])
        r = ranges[r_index]
        for i, h in enumerate(items):
            next_offset = (
                items[i + 1]["file_offset"]
                if i + 1 < len(items)
                else r["file_offset"] + r["size"]
            )
            span = max(0, min(next_offset - h["file_offset"], 1024 * 1024))
            span_by_type[h["type"]] += span
            span_count[h["type"]] += 1
            if len(samples) < 200:
                samples.append(
                    {
                        "type": h["type"],
                        "virtual_address": f"0x{h['virtual_address']:X}",
                        "range_va": f"0x{h['range_va']:X}",
                        "estimated_span_bytes_capped_1mb": span,
                    }
                )
    return span_by_type, span_count, samples


def write_outputs(streams, ranges, hits, per_type, per_type_variant, per_range, per_chunk):
    OUT_DIR.mkdir(parents=True, exist_ok=True)

    with (OUT_DIR / "minidump_streams.json").open("w", encoding="utf-8") as f:
        json.dump(
            [
                {"type": t, "data_size": size, "rva": rva}
                for t, size, rva in streams
            ],
            f,
            ensure_ascii=False,
            indent=2,
        )

    with (OUT_DIR / "memory_ranges.csv").open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f, fieldnames=["index", "stream", "va_hex", "file_offset", "size", "size_mb"]
        )
        writer.writeheader()
        for i, r in enumerate(ranges):
            writer.writerow(
                {
                    "index": i,
                    "stream": r["stream"],
                    "va_hex": f"0x{r['va']:X}",
                    "file_offset": r["file_offset"],
                    "size": r["size"],
                    "size_mb": round(r["size"] / 1024 / 1024, 3),
                }
            )

    with (OUT_DIR / "uid_hits.csv").open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(
            f,
            fieldnames=[
                "type",
                "uid",
                "variant",
                "file_offset_hex",
                "virtual_address_hex",
                "range_index",
                "range_va_hex",
                "range_size",
            ],
        )
        writer.writeheader()
        for h in hits:
            writer.writerow(
                {
                    "type": h["type"],
                    "uid": h["uid"],
                    "variant": h["variant"],
                    "file_offset_hex": f"0x{h['file_offset']:X}",
                    "virtual_address_hex": f"0x{h['virtual_address']:X}",
                    "range_index": h["range_index"],
                    "range_va_hex": f"0x{h['range_va']:X}",
                    "range_size": h["range_size"],
                }
            )

    span_by_type, span_count, span_samples = estimate_marker_spans(hits, ranges)

    summary_rows = []
    total_hits = sum(per_type.values())
    for type_name in UIDS:
        variants = {
            variant: count
            for (t, variant), count in per_type_variant.items()
            if t == type_name
        }
        summary_rows.append(
            {
                "type": type_name,
                "uid": UIDS[type_name],
                "hits_total": per_type[type_name],
                "hits_uuid_bytes_le": variants.get("uuid-bytes-le", 0),
                "hits_utf16le_lower": variants.get("utf16le-lower", 0),
                "hits_ascii_lower": variants.get("ascii-lower", 0),
                "estimated_span_mb_capped": round(span_by_type[type_name] / 1024 / 1024, 3),
                "share_of_all_hits_pct": round(per_type[type_name] * 100 / total_hits, 3)
                if total_hits
                else 0,
            }
        )

    with (OUT_DIR / "uid_summary.csv").open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=list(summary_rows[0]))
        writer.writeheader()
        writer.writerows(summary_rows)

    dense_rows = []
    for (range_index, chunk_index), counts in per_chunk.items():
        total = sum(counts.values())
        if total == 0:
            continue
        r = ranges[range_index]
        dense_rows.append(
            {
                "range_index": range_index,
                "range_va_hex": f"0x{r['va']:X}",
                "chunk_index_1mb": chunk_index,
                "chunk_va_hex": f"0x{r['va'] + chunk_index * 1024 * 1024:X}",
                "hits_total": total,
                **{f"hits_{name}": counts[name] for name in UIDS},
            }
        )
    dense_rows.sort(key=lambda x: x["hits_total"], reverse=True)

    dense_fields = [
        "range_index",
        "range_va_hex",
        "chunk_index_1mb",
        "chunk_va_hex",
        "hits_total",
        *[f"hits_{name}" for name in UIDS],
    ]
    with (OUT_DIR / "uid_dense_chunks.csv").open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=dense_fields)
        writer.writeheader()
        writer.writerows(dense_rows[:5000])

    with (OUT_DIR / "span_samples.json").open("w", encoding="utf-8") as f:
        json.dump(span_samples, f, ensure_ascii=False, indent=2)

    with (OUT_DIR / "README.md").open("w", encoding="utf-8") as f:
        f.write("# UID memory stats\n\n")
        f.write(
            "Скрипт ищет известные UID типов 1С в дампе в ASCII, UTF-16LE, "
            "big-endian UUID и little-endian UUID представлениях.\n\n"
        )
        f.write(
            "Колонка `estimated_span_mb_capped` не является точным размером объектов. "
            "Это грубая эвристика: расстояние от binary little-endian UID до следующей "
            "такой метки в том же memory range, ограниченное 1 МБ на одну находку.\n"
        )

    return summary_rows, dense_rows


def main():
    with DUMP_PATH.open("rb") as fh:
        with mmap.mmap(fh.fileno(), 0, access=mmap.ACCESS_READ) as mm:
            streams, ranges = parse_minidump_ranges(mm)
            patterns = build_patterns()
            hits, per_type, per_type_variant, per_range, per_chunk = scan_ranges(
                mm, ranges, patterns
            )
            summary_rows, dense_rows = write_outputs(
                streams, ranges, hits, per_type, per_type_variant, per_range, per_chunk
            )

    print(f"Ranges: {len(ranges)}")
    print(f"Hits: {len(hits)}")
    print(f"Output: {OUT_DIR}")
    print("\nSummary:")
    for row in summary_rows:
        print(
            f"{row['type']}: hits={row['hits_total']} "
            f"bin_le={row['hits_uuid_bytes_le']} "
            f"span_mb~={row['estimated_span_mb_capped']}"
        )
    print("\nTop dense chunks:")
    for row in dense_rows[:10]:
        print(row)


if __name__ == "__main__":
    main()
