Getting Started#
Coffea couples the columnar data model of Awkward Array with a thin execution layer so that an analysis can move from a laptop to a cluster without rewrites. The workflow always follows the same pattern:
Implement a
coffea.processor.ProcessorABCthat turns NanoEvents into accumulators.Execute it with
coffea.processor.Runnerusing a local executor while you iterate.Swap the executor when you are ready to scale out.
Below is a minimal processor that applies muon scale factors from correctionlib and produces a histogram.
import awkward as ak
import correctionlib
import hist
from coffea import processor
class MuonProcessor(processor.ProcessorABC):
def __init__(self, sf_path: str):
self.corrections = correctionlib.CorrectionSet.from_file(sf_path)
self.muon_sf = self.corrections["muon_sf"]
def process(self, events):
dataset = events.metadata["dataset"]
# Create histogram with category axis
h_mass = hist.Hist.new.StrCat([], growth=True, name="dataset").Reg(
60, 60, 120, name="mass", label="mμμ [GeV]"
).Weight()
# select OS dimuons
muons = events.Muon[events.Muon.tightId]
dimuons = ak.combinations(muons, 2, fields=["lead", "trail"])
dimuons = dimuons[dimuons.lead.charge != dimuons.trail.charge]
# correctionlib returns per-muon weights; take product per event
sf_lead = self.muon_sf.evaluate(dimuons.lead.eta, dimuons.lead.pt)
sf_trail = self.muon_sf.evaluate(dimuons.trail.eta, dimuons.trail.pt)
event_weight = sf_lead * sf_trail
mass = (dimuons.lead + dimuons.trail).mass
h_mass.fill(
dataset=dataset,
mass=mass,
weight=event_weight,
)
return {
dataset: {
"mass": h_mass,
"events": len(events),
}
}
def postprocess(self, accumulator):
return accumulator
Run locally#
from coffea import processor
from coffea.nanoevents import NanoAODSchema
fileset = {
"DYJets": {
"files": {"nano_dy.root": "Events"},
"metadata": {"is_mc": True},
},
"Data": {
"files": {"nano_data.root": "Events"},
"metadata": {"is_mc": False},
},
}
runner = processor.Runner(
executor=processor.IterativeExecutor(),
schema=NanoAODSchema,
savemetrics=True,
)
result, metrics = runner(fileset, processor_instance=MuonProcessor("muon_sf.json.gz"))
result is a nested accumulator that includes histograms and cutflow counters. The metrics dictionary captures runtime information such as bytes read and columns touched.
Scale out#
Scaling does not require modifying the processor. Replace the executor and, if needed, provide configuration for the backing service.
from dask.distributed import Client
client = Client("tcp://scheduler:8786")
cluster_runner = processor.Runner(
executor=processor.DaskExecutor(client=client),
schema=NanoAODSchema,
savemetrics=True,
)
result_cluster, metrics_cluster = cluster_runner(
fileset, processor_instance=MuonProcessor("muon_sf.json.gz")
)
You can follow the same pattern with FuturesExecutor, ParslExecutor, or TaskVineExecutor. See Coffea concepts for background on processors and executors.