How to collect results#

Processors aggregate per-chunk outputs by returning dictionaries from the process() method. This guide covers building histograms, cutflow counters, and structured outputs that merge cleanly after parallel execution.

Return results from process()#

In modern coffea, simply return a dictionary from your process() method. For virtual/eager modes, nest results under the dataset name; for dask mode, return a flat dictionary.

import hist
import awkward as ak
from coffea import processor


class DimuonProcessor(processor.ProcessorABC):
    def __init__(self, mode="virtual"):
        assert mode in ["virtual", "eager", "dask"]
        self._mode = mode

    def process(self, events):
        dataset = events.metadata["dataset"]

        # Create histogram (use hist.dask.Hist for dask mode)
        if self._mode == "dask":
            hist_class = hist.dask.Hist
        else:
            hist_class = hist.Hist

        h_mass = hist_class.new.Reg(60, 60, 120, name="mass", label="mμμ [GeV]").Int64()

        muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
        lead, trail = ak.unzip(ak.combinations(muons, 2))
        mass = (lead + trail).mass
        h_mass.fill(mass=mass)

        # For dask mode, return flat dict
        if self._mode == "dask":
            return {
                "entries": ak.num(events, axis=0),
                "mass": h_mass,
                "npairs": ak.sum(ak.num(mass, axis=1)),
            }
        # For virtual/eager mode, nest under dataset name
        else:
            return {
                dataset: {
                    "entries": len(events),
                    "mass": h_mass,
                    "npairs": int(ak.sum(ak.num(mass, axis=1))),
                }
            }

    def postprocess(self, accumulator):
        pass
  • No need for _accumulator, @property accumulator, or .identity() methods

  • Just return a plain dictionary

  • The framework automatically merges dictionaries and hist objects across chunks

Fill histograms#

Use hist directly without wrapper classes:

import awkward as ak
import hist

def process(self, events):
    dataset = events.metadata["dataset"]

    # Create histogram with category axis
    h_mass = hist.Hist.new.StrCat([], name="region", growth=True).Reg(
        60, 60, 120, name="mass", label="mμμ [GeV]"
    ).Weight()

    muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
    lead, trail = ak.unzip(ak.combinations(muons, 2))
    mass = (lead + trail).mass

    # Fill with event weights
    h_mass.fill(region="signal", mass=mass, weight=events.genWeight)

    return {
        dataset: {
            "entries": len(events),
            "mass": h_mass,
        }
    }
  • Histogram axes grow dynamically when you use growth=True

  • Use ak.num to count objects per event

  • No need to call ak.to_numpy() or ak.flatten() - hist handles awkward arrays directly

Track multiple results#

Return multiple items in your dictionary:

from coffea.analysis_tools import Weights

def process(self, events):
    dataset = events.metadata["dataset"]

    # Track weights (use Weights(None) for dask mode)
    weights = Weights(len(events))
    weights.add("genWeight", events.genWeight)

    # Create histograms
    h_mass = hist.Hist.new.Reg(60, 60, 120, name="mass").Weight()
    h_pt = hist.Hist.new.Reg(50, 0, 200, name="pt").Weight()

    muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
    lead, trail = ak.unzip(ak.combinations(muons, 2))
    dimuon = lead + trail

    h_mass.fill(mass=dimuon.mass, weight=weights.weight())
    h_pt.fill(pt=dimuon.pt, weight=weights.weight())

    return {
        dataset: {
            "entries": len(events),
            "sumw": float(ak.sum(events.genWeight)),
            "mass": h_mass,
            "pt": h_pt,
            "weightStats": weights.weightStatistics,
        }
    }
  • Include scalar values like event counts and sum of weights

  • Include hist objects for histograms

  • Include weightStatistics for debugging

  • All types will be automatically merged across chunks

Use postprocess to finalize results#

postprocess runs after all chunks are merged. Use it to compute derived quantities:

def postprocess(self, accumulator):
    # accumulator is the merged dictionary from all chunks
    for dataset in accumulator:
        entries = accumulator[dataset]["entries"]
        sumw = accumulator[dataset]["sumw"]
        # Add derived quantities
        accumulator[dataset]["avgWeight"] = sumw / entries if entries > 0 else 0

    return accumulator

Serialize results#

Histograms and dictionaries are picklable; save them to disk using coffea.util.save:

import coffea.util

result = runner(fileset, processor_instance=my_processor)
coffea.util.save(result, "out.coffea")

# Reload later
loaded = coffea.util.load("out.coffea")

When to use accumulator classes#

You don’t need accumulator classes for most use cases:

  • Histograms: Use hist.Hist directly - they already support merging

  • Scalar sums: Return plain Python int or float - they’re automatically summed

  • Dictionaries: Return plain Python dict - nested values are automatically merged

Use accumulator classes when you need specialized merging behavior:

  • Concatenation: Use column_accumulator or list_accumulator to append arrays or lists

  • Union: Use set_accumulator to collect unique values

  • Auto-initialization: Use defaultdict_accumulator for counters that self-initialize

Accumulator classes reference#

column_accumulator - Collect arrays across chunks#

Use column_accumulator to append numpy or awkward arrays from each chunk:

from coffea.processor import column_accumulator
import awkward as ak

def process(self, events):
    dataset = events.metadata["dataset"]

    # Select events and collect their kinematics
    selected_events = events[ak.max(events.Muon.pt, axis=1) > 50]
    event_ids = ak.to_numpy(selected_events.event)

    return {
        dataset: {
            "entries": len(events),
            "selected_event_ids": column_accumulator(event_ids),
        }
    }

The framework automatically concatenates arrays across chunks.

list_accumulator - Collect lists across chunks#

Use list_accumulator to collect small lists of values:

from coffea.processor import list_accumulator
import awkward as ak

def process(self, events):
    dataset = events.metadata["dataset"]

    # Collect metadata about interesting events
    mask = events.MET.pt > 200
    runs = ak.to_numpy(events.run[mask])
    lumis = ak.to_numpy(events.luminosityBlock[mask])
    event_ids = ak.to_numpy(events.event[mask])

    interesting = [(int(r), int(l), int(e))
                   for r, l, e in zip(runs, lumis, event_ids)]

    return {
        dataset: {
            "entries": len(events),
            "interesting_events": list_accumulator(interesting),
        }
    }

set_accumulator - Collect unique values#

Use set_accumulator to collect unique values across chunks:

from coffea.processor import set_accumulator
import awkward as ak

def process(self, events):
    dataset = events.metadata["dataset"]

    # Collect unique run numbers
    unique_runs = set_accumulator(set(ak.to_numpy(events.run)))

    return {
        dataset: {
            "entries": len(events),
            "runs": unique_runs,
        }
    }

defaultdict_accumulator - Simple counters#

Use defaultdict_accumulator for counters that automatically initialize:

from coffea.processor import defaultdict_accumulator

def process(self, events):
    dataset = events.metadata["dataset"]

    cutflow = defaultdict_accumulator(int)
    cutflow["total"] += len(events)
    cutflow["has_muon"] += int(ak.sum(ak.num(events.Muon, axis=1) > 0))
    cutflow["has_2muon"] += int(ak.sum(ak.num(events.Muon, axis=1) >= 2))

    return {
        dataset: {
            "cutflow": cutflow,
        }
    }

value_accumulator - Accumulate arbitrary types#

Use value_accumulator to accumulate values with custom types:

from coffea.processor import value_accumulator

def process(self, events):
    dataset = events.metadata["dataset"]

    # Track sum of weights
    sumw = value_accumulator(float, float(ak.sum(events.genWeight)))

    return {
        dataset: {
            "sumw": sumw,
        }
    }

Tips & tricks#

  • Return plain dictionaries - the framework handles merging automatically

  • Use hist.Hist for virtual/eager modes and hist.dask.Hist for dask mode

  • For simple cutflow counters, use regular Python dicts with integer values or defaultdict_accumulator(int)

  • Use accumulator classes when you need specialized merging behavior (concatenation, union, etc.)

  • Avoid early returns like if len(events) == 0: return - always return the full dictionary structure

  • Test with processor.IterativeExecutor first to validate the output structure

  • For large numpy arrays (>100 MB), consider writing them to disk inside process instead of using column_accumulator