In [1]:
import ImarisLib
# ================================================================
# Imaris XT Python – 3D null model by random seeding and distance sampling
# - Runs Monte Carlo at multiple N (500, 1000, 2000, 4000 and N=existing spots)
# - Reads shortest 3D distances directly from a Distance-Transform channel
# - Uses per-slice fetching to avoid ICE IllegalMessageSizeException
# - Outputs per-iteration mean distances (and optional raw distances)
# - Writes a summary CSV including empirical P-values for the existing-N test
#
# Author: Ziyi Wang, ChatGPT (GPT-5 Thinking)
# Tested: Imaris 9/10 XT (Python) on Windows
# ================================================================
def GetServer():
  vImarisLib = ImarisLib.ImarisLib()
  vServer = vImarisLib.GetServer()
  return vServer

import time
import random
import os

In [2]:
# ------------------- USER PARAMETERS -------------------
VOXEL_COUNTS = [500, 1000, 2000, 4000]   # ### CHANGED: run these Ns
N_ITER    = 10000
RANDOM_SEED = 1457
OUT_DIR   = os.path.expanduser("~")
OUT_BASENAME = "Imaris-Spots-Ana"
SAVE_RAW_DISTANCES = False     # True = write every spot distance (large!)
SAMPLE_WITH_REPLACEMENT = True # True recommended when allowed mask is small
PREVIEW_CREATE_SPOTS_FIRST_ITER = True  # Make one Spots object to visually confirm placement
PREVIEW_SPOT_RADIUS_UM = 1.0   # display radius for preview spots

# Channel hints (or set explicit indices below)
DIST_CH_NAME_HINT  = "distance"  # e.g. "DistanceToVessel"
MASK_CH_NAME_HINT  = "mask"      # e.g. "PlacementMask"
DIST_CH_INDEX = None
MASK_CH_INDEX = None

TIMEPOINT = 0
SUBSAMPLE_MASK = 1
PROGRESS_EVERY = 100
CHECK_VOXEL_UNITS = True
# ------------------------------------------------------

In [3]:
# ---- Helpers ----
def _get_imaris_app():
    vlib = ImarisLib.ImarisLib()
    try:
        app = vlib.GetApplication(0)  # last opened
    except:
        app = None
    return app

def _find_channel_index_by_name(ds, hint_lower):
    n = ds.GetSizeC()
    for c in range(n):
        name = ds.GetChannelName(c) or ""
        if hint_lower in name.lower():
            return c
    return None

def _lin_to_xyz(idx, sx, sy):
    z = idx // (sx * sy)
    r = idx - z * sx * sy
    y = r // sx
    x = r - y * sx
    return x, y, z

def _voxel_to_um(ds, x, y, z, sx, sy, sz):
    px = ds.GetExtendMinX() + (x + 0.5) * (ds.GetExtendMaxX() - ds.GetExtendMinX()) / sx
    py = ds.GetExtendMinY() + (y + 0.5) * (ds.GetExtendMaxY() - ds.GetExtendMinY()) / sy
    pz = ds.GetExtendMinZ() + (z + 0.5) * (ds.GetExtendMaxZ() - ds.GetExtendMinZ()) / sz
    return px, py, pz

def _sanitize_name(s: str) -> str:
    return "".join(c if (c.isalnum() or c in ("-", "_")) else "_" for c in s) or "UntitledImage"

# Script directory (XT: __file__ may not exist)
try:
    script_dir = os.path.dirname(os.path.realpath(__file__))
except NameError:
    script_dir = os.getcwd()

# Build an image title from app (NOT from dataset)
def _get_image_title(app, ds) -> str:
    # 1) try current file name (if dataset was loaded from disk)
    try:
        fname = app.GetCurrentFileName()  # returns full path string in many Imaris versions
        if fname:
            return _sanitize_name(os.path.splitext(os.path.basename(fname))[0])
    except Exception:
        pass
    # 2) try Surpass scene name
    try:
        scene = app.GetSurpassScene()
        if scene is not None:
            nm = scene.GetName()
            if nm:
                return _sanitize_name(nm)
    except Exception:
        pass
    # 3) fallbacks: first channel name or generic
    try:
        nm = ds.GetChannelName(0)
        if nm:
            return _sanitize_name(nm)
    except Exception:
        pass
    return "UntitledImage"

import random as _R
def pick_indices(n):
    if SAMPLE_WITH_REPLACEMENT:
        return [_R.choice(allowed_lin) for _ in range(n)]
    else:
        if len(allowed_lin) < n:
            raise RuntimeError(f"Allowed voxels ({len(allowed_lin)}) < requested n ({n}) for sampling without replacement.")
        return _R.sample(allowed_lin, n)

def _um_to_voxel(ds, px, py, pz, sx, sy, sz):
    ex, ey, ez = ds.GetExtendMinX(), ds.GetExtendMinY(), ds.GetExtendMinZ()
    ex2, ey2, ez2 = ds.GetExtendMaxX(), ds.GetExtendMaxY(), ds.GetExtendMaxZ()
    x = int((px - ex) * sx / max(1e-12, (ex2 - ex)))
    y = int((py - ey) * sy / max(1e-12, (ey2 - ey)))
    z = int((pz - ez) * sz / max(1e-12, (ez2 - ez)))
    if x < 0: x = 0
    if y < 0: y = 0
    if z < 0: z = 0
    if x >= sx: x = sx - 1
    if y >= sy: y = sy - 1
    if z >= sz: z = sz - 1
    return x, y, z

def _empirical_p(null_vals, obs):
    # one-sided lower and upper + two-sided (clipped)
    n = float(len(null_vals))
    if n <= 0:
        return None, None, None
    lower = sum(1 for v in null_vals if v <= obs) / n
    upper = sum(1 for v in null_vals if v >= obs) / n
    two   = min(1.0, 2.0 * min(lower, upper))
    return lower, upper, two

def dist_at_xyz(x, y, z):
    return dist_slices[z][y * sx + x]

def mask_at_xyz(x, y, z):
    return mask_slices[z][y * sx + x]

In [4]:
random.seed(RANDOM_SEED)
app = _get_imaris_app()
if app is None:
    raise RuntimeError("Could not connect to Imaris. Open a dataset and retry.")
ds = app.GetDataSet()
if ds is None:
    raise RuntimeError("No dataset found.")

sx, sy, sz = ds.GetSizeX(), ds.GetSizeY(), ds.GetSizeZ()
sc, st = ds.GetSizeC(), ds.GetSizeT()
if st <= TIMEPOINT:
    raise RuntimeError(f"Requested TIMEPOINT {TIMEPOINT} but dataset has only {st} timepoints.")

# Locate channels
dist_ch = DIST_CH_INDEX if DIST_CH_INDEX is not None else _find_channel_index_by_name(ds, DIST_CH_NAME_HINT.lower())
mask_ch = MASK_CH_INDEX if MASK_CH_INDEX is not None else _find_channel_index_by_name(ds, MASK_CH_NAME_HINT.lower())
if dist_ch is None or dist_ch < 0 or dist_ch >= sc:
    raise RuntimeError(f"Distance channel not found. Set DIST_CH_INDEX or use a name containing '{DIST_CH_NAME_HINT}'.")
if mask_ch is None or mask_ch < 0 or mask_ch >= sc:
    raise RuntimeError(f"Mask channel not found. Set MASK_CH_INDEX or use a name containing '{MASK_CH_NAME_HINT}'.")

img_title = _get_image_title(app, ds)

print(f"We are working on {img_title}")
print(f"Distance channel  [{dist_ch}]: {ds.GetChannelName(dist_ch)}")
print(f"Mask channel      [{mask_ch}]: {ds.GetChannelName(mask_ch)}")

if CHECK_VOXEL_UNITS:
    vx = ds.GetExtendMaxX() - ds.GetExtendMinX()
    vy = ds.GetExtendMaxY() - ds.GetExtendMinY()
    vz = ds.GetExtendMaxZ() - ds.GetExtendMinZ()
    print(f"Vox dims: {sx}x{sy}x{sz} ; Extent (µm): {vx:.3f} x {vy:.3f} x {vz:.3f}")
    print(f"Voxel size (µm): {vx/sx:.6f} x {vy/sy:.6f} x {vz/sz:.6f}")
    print("NOTE: Distance channel values are assumed to be µm from the distance-transform XT.")

# Pull 3D volumes slice-by-slice
print("Fetching distance and mask volumes slice-by-slice…")
dist_slices = [None] * sz
mask_slices = [None] * sz
for z in range(sz):
    dist_slices[z] = ds.GetDataSubVolumeAs1DArrayFloats(0, 0, z, dist_ch, TIMEPOINT, sx, sy, 1)
    mask_slices[z] = ds.GetDataSubVolumeAs1DArrayBytes(0, 0, z, mask_ch, TIMEPOINT, sx, sy, 1)
    if (z+1) % 20 == 0 or z == sz-1:
        print(f"  …z {z+1}/{sz} loaded")

# -------- Build list of allowed voxel linear indices --------
print("[INFO] Indexing allowed voxels from mask…")
allowed_lin = []
for z in range(0, sz, SUBSAMPLE_MASK):
    ms = mask_slices[z]
    for y in range(0, sy, SUBSAMPLE_MASK):
        row_base = y * sx
        if SUBSAMPLE_MASK == 1:
            for x in range(sx):
                if ms[row_base + x] > 0:
                    allowed_lin.append((z * sy + y) * sx + x)
        else:
            for x in range(0, sx, SUBSAMPLE_MASK):
                if ms[row_base + x] > 0:
                    allowed_lin.append((z * sy + y) * sx + x)

if not allowed_lin:
    raise RuntimeError("Placement mask has no allowed voxels (>0).")
print(f"[INFO] Allowed voxels: {len(allowed_lin):,}")

We are working on 20220404_CD8_AF405_0325_Allo2_Stitch
Distance channel  [3]: distance
Mask channel      [4]: mask
Vox dims: 9387x3825x13 ; Extent (µm): 3896.857 x 1587.884 x 13.000
Voxel size (µm): 0.415133 x 0.415133 x 1.000000
NOTE: Distance channel values are assumed to be µm from the distance-transform XT.
Fetching distance and mask volumes slice-by-slice…
  …z 13/13 loaded
[INFO] Indexing allowed voxels from mask…
[INFO] Allowed voxels: 452,810,103


In [5]:
# ===========================================================
# ===== Measure EXISTING Spots and compute observed mean =====
# ===========================================================
factory = app.GetFactory()
scene   = app.GetSurpassScene()
ts = time.strftime("%Y%m%d_%H%M%S")

out_exist_csv = os.path.join(script_dir, f"{img_title}_ExistingSpots_toVessel_{ts}.csv")
wrote_header = False
existing_distances = []     # ### NEW: collect all measured d(spot, vessel) at TIMEPOINT
existing_total_count = 0

if scene is not None:
    n_children = scene.GetNumberOfChildren()
    with open(out_exist_csv, "w") as f:
        for ci in range(n_children):
            node = scene.GetChild(ci)
            sp   = factory.ToSpots(node)
            if sp is None:
                continue

            spots_name = sp.GetName() or f"Spots_{ci}"
            pos_xyz = sp.GetPositionsXYZ()
            idx_t   = sp.GetIndicesT()

            if not wrote_header:
                f.write("spots_name,spot_index,t,px,py,pz,dist_um\n")
                wrote_header = True

            for i, p in enumerate(pos_xyz):
                t_here = idx_t[i] if i < len(idx_t) else 0
                if t_here != TIMEPOINT:
                    continue

                px, py, pz = float(p[0]), float(p[1]), float(p[2])
                vx, vy, vz = _um_to_voxel(ds, px, py, pz, sx, sy, sz)
                d_um = dist_at_xyz(vx, vy, vz)
                existing_distances.append(d_um)
                existing_total_count += 1
                f.write(f"{spots_name},{i},{t_here},{px},{py},{pz},{d_um}\n")

if existing_total_count > 0:
    existing_mean = sum(existing_distances) / float(existing_total_count)
    print(f"[INFO] Measured existing Spots → {out_exist_csv}")
    print(f"[INFO] Existing at T={TIMEPOINT}: n={existing_total_count}, mean={existing_mean:.4f} µm")
else:
    existing_mean = None
    print("[WARN] No existing Spots measured at the requested TIMEPOINT; will skip P-value comparison.")

# ### NEW: include “same-N-as-existing” into the run set
if existing_total_count and (existing_total_count not in VOXEL_COUNTS):
    VOXEL_COUNTS = sorted(set(VOXEL_COUNTS + [existing_total_count]))

[INFO] Measured existing Spots → c:\Users\wong-ziyi\Documents\Asada_Imaris_py\20220404_CD8_AF405_0325_Allo2_Stitch_ExistingSpots_toVessel_20251023_183903.csv
[INFO] Existing at T=0: n=3280, mean=5.9179 µm


In [6]:
# ============================================
# ========== Monte Carlo for each N ==========
# ============================================
summary_rows = []  # will write to a summary CSV at the end
preview_iterations = []  # store a few first-iteration indices for preview spots

def run_mc_for_n(N_SPOTS, seed_offset=0):
    _R.seed(RANDOM_SEED + int(N_SPOTS) + int(seed_offset))  # deterministic per N
    if (not SAMPLE_WITH_REPLACEMENT) and len(allowed_lin) < N_SPOTS:
        raise RuntimeError(
            f"Allowed voxels ({len(allowed_lin)}) < N_SPOTS ({N_SPOTS}) when sampling without replacement."
        )

    ts_local = time.strftime("%Y%m%d_%H%M%S")
    base     = f"{OUT_BASENAME}_{N_SPOTS}vox_{N_ITER}iter"  # ### CHANGED: include N in base
    out_csv  = os.path.join(script_dir, f"{img_title}_{base}_{ts_local}.csv")
    out_raw  = os.path.join(script_dir, f"{img_title}_{base}_RAW_{ts_local}.csv")

    means = []
    if SAVE_RAW_DISTANCES:
        with open(out_raw, "w") as f:
            # ### CHANGED: include n_random_voxels in header context
            f.write("iteration,n_random_voxels," + ",".join(f"d{i+1}" for i in range(N_SPOTS)) + "\n")

    print(f"[INFO] Running Monte Carlo for N={N_SPOTS} …")
    first_iter_idxs = None

    for it in range(1, N_ITER + 1):
        idxs = pick_indices(N_SPOTS)
        if it == 1:
            first_iter_idxs = list(idxs)  # keep for preview
        ds_vals = []
        for idx in idxs:
            x, y, z = _lin_to_xyz(idx, sx, sy)
            ds_vals.append(dist_at_xyz(x, y, z))
        m = sum(ds_vals) / float(N_SPOTS)
        means.append(m)

        if SAVE_RAW_DISTANCES:
            with open(out_raw, "a") as f:
                f.write(str(it) + "," + str(N_SPOTS) + "," + ",".join(f"{v:.6f}" for v in ds_vals) + "\n")

        if it % PROGRESS_EVERY == 0:
            print(f"  …N={N_SPOTS} iteration {it}/{N_ITER} (mean={m:.3f})")

    # write per-iteration means (include N as a column)
    with open(out_csv, "w") as f:
        f.write("iteration,n_random_voxels,mean_distance_um\n")
        for i, m in enumerate(means, 1):
            f.write(f"{i},{N_SPOTS},{m}\n")

    mu = sum(means) / float(len(means))
    var = sum((x - mu) ** 2 for x in means) / float(max(1, len(means) - 1))
    sd  = var ** 0.5

    print(f"[INFO] N={N_SPOTS}: null mean = {mu:.4f} µm, sd = {sd:.4f} µm, n_iter = {len(means)}")
    print(f"[INFO] Saved per-iteration means to: {out_csv}")
    if SAVE_RAW_DISTANCES:
        print(f"[INFO] Saved raw distances to: {out_raw}")

    return {
        "N": N_SPOTS,
        "means": means,
        "mu": mu,
        "sd": sd,
        "out_csv": out_csv,
        "first_iter_idxs": first_iter_idxs
    }

# Run for each requested N
results_by_n = {}
for nvox in VOXEL_COUNTS:
    res = run_mc_for_n(nvox)
    results_by_n[nvox] = res
    if res["first_iter_idxs"] is not None:
        preview_iterations.append((nvox, res["first_iter_idxs"]))

[INFO] Running Monte Carlo for N=500 …
  …N=500 iteration 100/10000 (mean=21.124)
  …N=500 iteration 200/10000 (mean=17.296)
  …N=500 iteration 300/10000 (mean=22.730)
  …N=500 iteration 400/10000 (mean=22.691)
  …N=500 iteration 500/10000 (mean=19.462)
  …N=500 iteration 600/10000 (mean=21.795)
  …N=500 iteration 700/10000 (mean=20.774)
  …N=500 iteration 800/10000 (mean=20.657)
  …N=500 iteration 900/10000 (mean=17.956)
  …N=500 iteration 1000/10000 (mean=16.189)
  …N=500 iteration 1100/10000 (mean=19.655)
  …N=500 iteration 1200/10000 (mean=20.175)
  …N=500 iteration 1300/10000 (mean=20.359)
  …N=500 iteration 1400/10000 (mean=18.159)
  …N=500 iteration 1500/10000 (mean=22.917)
  …N=500 iteration 1600/10000 (mean=20.607)
  …N=500 iteration 1700/10000 (mean=20.826)
  …N=500 iteration 1800/10000 (mean=20.835)
  …N=500 iteration 1900/10000 (mean=20.576)
  …N=500 iteration 2000/10000 (mean=19.897)
  …N=500 iteration 2100/10000 (mean=22.122)
  …N=500 iteration 2200/10000 (mean=20.876)
  

In [7]:
# ============================================
# ========== Compute P-values & Summary ======
# ============================================
summary_ts = time.strftime("%Y%m%d_%H%M%S")
summary_path = os.path.join(script_dir, f"{img_title}_Summary_{summary_ts}.csv")

with open(summary_path, "w") as sf:
    sf.write("image_title,timepoint,n_random_voxels,n_iter,null_mean_um,null_sd_um,"
             "existing_mean_um,existing_n,p_lower,p_upper,p_two_sided,out_csv\n")
    for nvox, res in results_by_n.items():
        mu, sd = res["mu"], res["sd"]
        maybe_existing_mean = existing_mean if (existing_total_count and nvox == existing_total_count) else ""
        maybe_existing_n    = existing_total_count if (existing_total_count and nvox == existing_total_count) else ""
        pL = pU = p2 = ""
        if existing_total_count and nvox == existing_total_count:
            pL, pU, p2 = _empirical_p(res["means"], existing_mean)

        sf.write(
            f"{img_title},{TIMEPOINT},{nvox},{N_ITER},{mu:.6f},{sd:.6f},"
            f"{(f'{maybe_existing_mean:.6f}' if maybe_existing_mean!='' else '')},"
            f"{maybe_existing_n},{pL if pL=='' else f'{pL:.6f}'},"
            f"{pU if pU=='' else f'{pU:.6f}'},{p2 if p2=='' else f'{p2:.6f}'},"
            f"{res['out_csv']}\n"
        )

print(f"[INFO] Summary with P-values → {summary_path}")

[INFO] Summary with P-values → c:\Users\wong-ziyi\Documents\Asada_Imaris_py\20220404_CD8_AF405_0325_Allo2_Stitch_Summary_20251023_184345.csv


In [8]:
# -------- Optional preview: create Spots objects for the *first iteration* of each N --------
if PREVIEW_CREATE_SPOTS_FIRST_ITER and len(preview_iterations) > 0:
    factory = app.GetFactory()
    import array

    max_preview_sets = min(10, len(preview_iterations))
    for k in range(max_preview_sets):
        nvox, iter_indices = preview_iterations[k]

        vSpots = factory.CreateSpots()
        vSpots.SetName(f"Preview_RandomSpots_N{nvox}_iter1")

        xs, ys, zs = array.array('f'), array.array('f'), array.array('f')
        ts_arr = array.array('i')

        for idx in iter_indices:
            x, y, z = _lin_to_xyz(idx, sx, sy)
            px, py, pz = _voxel_to_um(ds, x, y, z, sx, sy, sz)
            xs.append(px); ys.append(py); zs.append(pz); ts_arr.append(TIMEPOINT)

        n = len(xs)
        positions = [[float(xs[i]), float(ys[i]), float(zs[i])] for i in range(n)]
        times     = [int(t) for t in ts_arr]
        radii     = [float(PREVIEW_SPOT_RADIUS_UM)] * n

        vSpots.Set(positions, times, radii)
        app.GetSurpassScene().AddChild(vSpots, -1)

    print(f"[INFO] Preview spots for first iterations of up to {max_preview_sets} N-sets added to Surpass.")


[INFO] Preview spots for first iterations of up to 5 N-sets added to Surpass.
