How to collect results#
Processors aggregate per-chunk outputs by returning dictionaries from the process() method. This guide covers building histograms, cutflow counters, and structured outputs that merge cleanly after parallel execution.
Return results from process()#
In modern coffea, simply return a dictionary from your process() method. For virtual/eager modes, nest results under the dataset name; for dask mode, return a flat dictionary.
import hist
import awkward as ak
from coffea import processor
class DimuonProcessor(processor.ProcessorABC):
def __init__(self, mode="virtual"):
assert mode in ["virtual", "eager", "dask"]
self._mode = mode
def process(self, events):
dataset = events.metadata["dataset"]
# Create histogram (use hist.dask.Hist for dask mode)
if self._mode == "dask":
hist_class = hist.dask.Hist
else:
hist_class = hist.Hist
h_mass = hist_class.new.Reg(60, 60, 120, name="mass", label="mμμ [GeV]").Int64()
muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
lead, trail = ak.unzip(ak.combinations(muons, 2))
mass = (lead + trail).mass
h_mass.fill(mass=mass)
# For dask mode, return flat dict
if self._mode == "dask":
return {
"entries": ak.num(events, axis=0),
"mass": h_mass,
"npairs": ak.sum(ak.num(mass, axis=1)),
}
# For virtual/eager mode, nest under dataset name
else:
return {
dataset: {
"entries": len(events),
"mass": h_mass,
"npairs": int(ak.sum(ak.num(mass, axis=1))),
}
}
def postprocess(self, accumulator):
pass
No need for
_accumulator,@property accumulator, or.identity()methodsJust return a plain dictionary
The framework automatically merges dictionaries and hist objects across chunks
Fill histograms#
Use hist directly without wrapper classes:
import awkward as ak
import hist
def process(self, events):
dataset = events.metadata["dataset"]
# Create histogram with category axis
h_mass = hist.Hist.new.StrCat([], name="region", growth=True).Reg(
60, 60, 120, name="mass", label="mμμ [GeV]"
).Weight()
muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
lead, trail = ak.unzip(ak.combinations(muons, 2))
mass = (lead + trail).mass
# Fill with event weights
h_mass.fill(region="signal", mass=mass, weight=events.genWeight)
return {
dataset: {
"entries": len(events),
"mass": h_mass,
}
}
Histogram axes grow dynamically when you use
growth=TrueUse
ak.numto count objects per eventNo need to call
ak.to_numpy()orak.flatten()- hist handles awkward arrays directly
Track multiple results#
Return multiple items in your dictionary:
from coffea.analysis_tools import Weights
def process(self, events):
dataset = events.metadata["dataset"]
# Track weights (use Weights(None) for dask mode)
weights = Weights(len(events))
weights.add("genWeight", events.genWeight)
# Create histograms
h_mass = hist.Hist.new.Reg(60, 60, 120, name="mass").Weight()
h_pt = hist.Hist.new.Reg(50, 0, 200, name="pt").Weight()
muons = events.Muon[(events.Muon.tightId) & (events.Muon.pt > 25)]
lead, trail = ak.unzip(ak.combinations(muons, 2))
dimuon = lead + trail
h_mass.fill(mass=dimuon.mass, weight=weights.weight())
h_pt.fill(pt=dimuon.pt, weight=weights.weight())
return {
dataset: {
"entries": len(events),
"sumw": float(ak.sum(events.genWeight)),
"mass": h_mass,
"pt": h_pt,
"weightStats": weights.weightStatistics,
}
}
Include scalar values like event counts and sum of weights
Include hist objects for histograms
Include
weightStatisticsfor debuggingAll types will be automatically merged across chunks
Use postprocess to finalize results#
postprocess runs after all chunks are merged. Use it to compute derived quantities:
def postprocess(self, accumulator):
# accumulator is the merged dictionary from all chunks
for dataset in accumulator:
entries = accumulator[dataset]["entries"]
sumw = accumulator[dataset]["sumw"]
# Add derived quantities
accumulator[dataset]["avgWeight"] = sumw / entries if entries > 0 else 0
return accumulator
Serialize results#
Histograms and dictionaries are picklable; save them to disk using coffea.util.save:
import coffea.util
result = runner(fileset, processor_instance=my_processor)
coffea.util.save(result, "out.coffea")
# Reload later
loaded = coffea.util.load("out.coffea")
When to use accumulator classes#
You don’t need accumulator classes for most use cases:
Histograms: Use
hist.Histdirectly - they already support mergingScalar sums: Return plain Python
intorfloat- they’re automatically summedDictionaries: Return plain Python
dict- nested values are automatically merged
Use accumulator classes when you need specialized merging behavior:
Concatenation: Use
column_accumulatororlist_accumulatorto append arrays or listsUnion: Use
set_accumulatorto collect unique valuesAuto-initialization: Use
defaultdict_accumulatorfor counters that self-initialize
Accumulator classes reference#
column_accumulator - Collect arrays across chunks#
Use column_accumulator to append numpy or awkward arrays from each chunk:
from coffea.processor import column_accumulator
import awkward as ak
def process(self, events):
dataset = events.metadata["dataset"]
# Select events and collect their kinematics
selected_events = events[ak.max(events.Muon.pt, axis=1) > 50]
event_ids = ak.to_numpy(selected_events.event)
return {
dataset: {
"entries": len(events),
"selected_event_ids": column_accumulator(event_ids),
}
}
The framework automatically concatenates arrays across chunks.
list_accumulator - Collect lists across chunks#
Use list_accumulator to collect small lists of values:
from coffea.processor import list_accumulator
import awkward as ak
def process(self, events):
dataset = events.metadata["dataset"]
# Collect metadata about interesting events
mask = events.MET.pt > 200
runs = ak.to_numpy(events.run[mask])
lumis = ak.to_numpy(events.luminosityBlock[mask])
event_ids = ak.to_numpy(events.event[mask])
interesting = [(int(r), int(l), int(e))
for r, l, e in zip(runs, lumis, event_ids)]
return {
dataset: {
"entries": len(events),
"interesting_events": list_accumulator(interesting),
}
}
set_accumulator - Collect unique values#
Use set_accumulator to collect unique values across chunks:
from coffea.processor import set_accumulator
import awkward as ak
def process(self, events):
dataset = events.metadata["dataset"]
# Collect unique run numbers
unique_runs = set_accumulator(set(ak.to_numpy(events.run)))
return {
dataset: {
"entries": len(events),
"runs": unique_runs,
}
}
defaultdict_accumulator - Simple counters#
Use defaultdict_accumulator for counters that automatically initialize:
from coffea.processor import defaultdict_accumulator
def process(self, events):
dataset = events.metadata["dataset"]
cutflow = defaultdict_accumulator(int)
cutflow["total"] += len(events)
cutflow["has_muon"] += int(ak.sum(ak.num(events.Muon, axis=1) > 0))
cutflow["has_2muon"] += int(ak.sum(ak.num(events.Muon, axis=1) >= 2))
return {
dataset: {
"cutflow": cutflow,
}
}
value_accumulator - Accumulate arbitrary types#
Use value_accumulator to accumulate values with custom types:
from coffea.processor import value_accumulator
def process(self, events):
dataset = events.metadata["dataset"]
# Track sum of weights
sumw = value_accumulator(float, float(ak.sum(events.genWeight)))
return {
dataset: {
"sumw": sumw,
}
}
Tips & tricks#
Return plain dictionaries - the framework handles merging automatically
Use
hist.Histfor virtual/eager modes andhist.dask.Histfor dask modeFor simple cutflow counters, use regular Python dicts with integer values or
defaultdict_accumulator(int)Use accumulator classes when you need specialized merging behavior (concatenation, union, etc.)
Avoid early returns like
if len(events) == 0: return- always return the full dictionary structureTest with
processor.IterativeExecutorfirst to validate the output structureFor large numpy arrays (>100 MB), consider writing them to disk inside
processinstead of usingcolumn_accumulator