Runner#

class coffea.processor.Runner(executor: ~coffea.processor.executor.ExecutorBase, pre_executor: ~coffea.processor.executor.ExecutorBase | None = None, chunksize: int = 100000, maxchunks: int | None = None, metadata_cache: ~collections.abc.MutableMapping | None = None, skipbadfiles: bool | tuple[type[BaseException], ...] = False, xrootdtimeout: int | None = 60, align_clusters: bool = False, savemetrics: bool = False, schema: ~coffea.nanoevents.schemas.base.BaseSchema | None = <class 'coffea.nanoevents.schemas.nanoaod.NanoAODSchema'>, processor_compression: int = 1, format: str = 'root', checkpointer: ~coffea.processor.checkpointer.CheckpointerABC | None = None, cachestrategy: None | ~typing.Literal['dask-worker'] | ~collections.abc.Callable[[...], ~collections.abc.MutableMapping] = None)[source]#

Bases: object

A tool to run a processor using uproot for data delivery

A convenience wrapper to submit jobs for a file set, which is a dictionary of dataset: [file list] entries. Supports only uproot TTree reading, via NanoEvents. For more customized processing, e.g. to read other objects from the files and pass them into data frames, one can write a similar function in their user code.

Parameters:
  • executor (ExecutorBase instance) – Executor, which implements a callable with inputs: items, function, accumulator and performs some action equivalent to: for item in items: accumulator += function(item)

  • pre_executor (ExecutorBase instance) – Executor, used to calculate fileset metadata Defaults to executor

  • chunksize (int, optional) – Maximum number of entries to process at a time in the data frame, default: 100k

  • maxchunks (int, optional) – Maximum number of chunks to process per dataset Defaults to processing the whole dataset

  • metadata_cache (mapping, optional) – A dict-like object to use as a cache for (file, tree) metadata that is used to determine chunking. Defaults to a in-memory LRU cache that holds 100k entries (about 1MB depending on the length of filenames, etc.) If you edit an input file (please don’t) during a session, the session can be restarted to clear the cache.

  • checkpointer (CheckpointerABC, optional) – A CheckpointerABC instance to manage checkpointing of each chunk output

Attributes Summary

Methods Summary

__call__(fileset, processor_instance, *[, ...])

Run the processor_instance on a given fileset

automatic_retries(retries, skipbadfiles, ...)

This should probably defined on Executor-level.

get_cache(cachestrategy)

metadata_fetcher_parquet(item)

metadata_fetcher_root(xrootdtimeout, ...)

preprocess(fileset, *[, treename, ...])

Preprocess the fileset and generate work items

run(fileset, processor_instance, *[, ...])

Run the processor_instance on a given fileset

Attributes Documentation

align_clusters: bool = False#
cachestrategy: None | Literal['dask-worker'] | Callable[[...], MutableMapping] = None#
checkpointer: CheckpointerABC | None = None#
chunksize: int = 100000#
executor: ExecutorBase = <dataclasses._MISSING_TYPE object>#
format: str = 'root'#
maxchunks: int | None = None#
metadata_cache: MutableMapping | None = None#
pre_executor: ExecutorBase | None = None#
processor_compression: int = 1#
retries#
savemetrics: bool = False#
skipbadfiles: bool | tuple[type[BaseException], ...] = False#
use_dataframes#
xrootdtimeout: int | None = 60#

Methods Documentation

__call__(fileset: dict, processor_instance: ProcessorABC | Callable[[awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Addable | MutableSet | MutableMapping[source]#

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

  • processor_instance (ProcessorABC or Callable) – An instance of a class deriving from ProcessorABC or a single-argument callable

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • iteritems_options (dict, optional) – Any options to pass to tree.iteritems

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-level preload.

static automatic_retries(retries: int, skipbadfiles: bool | tuple[type[BaseException], ...], func, *args, **kwargs)[source]#

This should probably defined on Executor-level.

static get_cache(cachestrategy)[source]#
static metadata_fetcher_parquet(item: FileMeta)[source]#
static metadata_fetcher_root(xrootdtimeout: int, align_clusters: bool, uproot_options: dict, item: FileMeta) Addable | MutableSet | MutableMapping[source]#
preprocess(fileset: dict, *, treename: str | None = None, uproot_options: dict | None = {}, trace: Callable | None = None, processor_instance: ProcessorABC | Callable | None = None) Generator[source]#

Preprocess the fileset and generate work items

Parameters:
  • fileset (dict) – A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define a different tree name per file in the dictionary: {dataset: {'files': {file: 'name'}}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

  • treename (str) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided, processor_instance must also be given. Tracing is performed using the first openable file per dataset and the result overrides any preload specified in the fileset. If None, no tracing is performed and only fileset-level preload (if any) is used.

  • processor_instance (ProcessorABC or Callable, optional) – The processor whose column access will be traced. Required when trace is provided.

run(fileset: dict | str | list[WorkItem] | Generator, processor_instance: ProcessorABC | Callable[[awkward.Array], Any], *, treename: str | None = None, uproot_options: dict | None = {}, iteritems_options: dict | None = {}, trace: Callable | None = None) Addable | MutableSet | MutableMapping[source]#

Run the processor_instance on a given fileset

Parameters:
  • fileset (dict | str | List[WorkItem] | Generator) –

    Fileset can be one of the following:

    • A dictionary {dataset: [file, file], } Optionally, if some files’ tree name differ, the dictionary can be specified: {dataset: {'treename': 'name', 'files': [file, file]}, } You can also define a different tree name per file in the dictionary: {dataset: {'files': {file: 'name'}}, } You can also define branches to preload per dataset: {dataset: {'preload': ['branch1', 'branch2'], 'files': [file, file]}, }

    • A single file name

    • File chunks for self.preprocess()

    • Chunk generator

  • processor_instance (ProcessorABC or Callable) – An instance of a class deriving from ProcessorABC or a single-argument callable

  • treename (str, optional) – name of tree inside each root file, can be None; treename can also be defined in fileset, which will override the passed treename Not needed if processing premade chunks

  • uproot_options (dict, optional) – Any options to pass to uproot.open

  • iteritems_options (dict, optional) – Any options to pass to tree.iteritems

  • trace (Callable, optional) – A tracing function that determines which columns a processing function accesses. It takes two arguments — a processing function (accepting events) and a NanoEvents array — and returns an iterable of column name strings. See coffea.nanoevents.trace.trace for the default implementation. When provided and preprocessing is needed, tracing is performed and takes precedence over fileset-level preload.