Source code for scannerpy.op

import grpc
import copy
import pickle
import types
import uuid

from scannerpy.common import *
from scannerpy.protobuf_generator import python_to_proto
from typing import Dict, List, Union, Tuple, Optional, Sequence
from inspect import signature
from itertools import islice
from collections import OrderedDict
from functools import wraps


[docs]class OpColumn: def __init__(self, db, op, col, typ): self._db = db self._op = op self._col = col self._type = typ self._encode_options = None if self._type == self._db.protobufs.Video: self._encode_options = {'codec': 'default'}
[docs] def compress(self, codec='video', **kwargs): self._assert_is_video() codecs = { 'video': self.compress_video, 'default': self.compress_default, 'raw': self.lossless } if codec in codecs: return codecs[codec](self, **kwargs) else: raise ScannerException('Compression codec {} not currently ' 'supported. Available codecs are: {}.' .format(' '.join(list(codecs.keys()))))
[docs] def compress_video(self, quality=-1, bitrate=-1, keyframe_distance=-1): self._assert_is_video() encode_options = { 'codec': 'h264', 'quality': quality, 'bitrate': bitrate, 'keyframe_distance': keyframe_distance } return self._new_compressed_column(encode_options)
[docs] def lossless(self): self._assert_is_video() encode_options = {'codec': 'raw'} return self._new_compressed_column(encode_options)
[docs] def compress_default(self): self._assert_is_video() encode_options = {'codec': 'default'} return self._new_compressed_column(encode_options)
def _assert_is_video(self): if self._type != self._db.protobufs.Video: raise ScannerException('Compression only supported for columns of' 'type "video". Column {} type is {}.'.format( self._col, self.db.protobufs.ColumnType.Name( self._type))) def _new_compressed_column(self, encode_options): new_col = OpColumn(self._db, self._op, self._col, self._type) new_col._encode_options = encode_options return new_col
PYTHON_OP_REGISTRY = {}
[docs]class OpGenerator: """ Creates Op instances to define a computation. When a particular op is requested from the generator, e.g. `db.ops.Histogram`, the generator does a dynamic lookup for the op in a C++ registry. """ def __init__(self, db): self._db = db def __getattr__(self, name): # Check python registry for Op if name in PYTHON_OP_REGISTRY: py_op_info = PYTHON_OP_REGISTRY[name] # If Op has not been registered yet, register it pseudo_name = name + ':' + py_op_info['registration_id'] name = pseudo_name if not name in self._db._python_ops: devices = [] if py_op_info['device_type']: devices.append(py_op_info['device_type']) if py_op_info['device_sets']: for d in py_op_info['device_sets']: devices.append(d[0]) self._db.register_op( pseudo_name, py_op_info['input_columns'], py_op_info['output_columns'], py_op_info['variadic_inputs'], py_op_info['stencil'], py_op_info['unbounded_state'], py_op_info['bounded_state'], py_op_info['proto_path']) for device in devices: self._db.register_python_kernel(pseudo_name, device, py_op_info['kernel'], py_op_info['batch']) # This will raise an exception if the op does not exist. op_info = self._db._get_op_info(name) def make_op(*args, **kwargs): inputs = [] if op_info.variadic_inputs: inputs.extend(args) else: for c in op_info.input_columns: val = kwargs.pop(c.name, None) if val is None: raise ScannerException( 'Op {} required column {} as input'.format( name, c.name)) inputs.append(val) device = kwargs.pop('device', DeviceType.CPU) batch = kwargs.pop('batch', -1) bounded_state = kwargs.pop('bounded_state', -1) stencil = kwargs.pop('stencil', []) extra = kwargs.pop('extra', None) args = kwargs.pop('args', None) op = Op(self._db, name, inputs, device, batch, bounded_state, stencil, kwargs if args is None else args, extra) return op.outputs() return make_op
[docs]class Op: def __init__(self, db, name, inputs, device, batch=-1, warmup=-1, stencil=[0], args={}, extra=None): self._db = db self._name = name self._inputs = inputs self._device = device self._batch = batch self._warmup = warmup self._stencil = stencil self._args = args self._extra = extra if (name == 'Space' or name == 'Sample' or name == 'Slice' or name == 'Unslice'): outputs = [] for c in inputs: outputs.append(OpColumn(db, self, c._col, c._type)) else: cols = self._db._get_output_columns(self._name) outputs = [OpColumn(self._db, self, c.name, c.type) for c in cols] self._outputs = outputs
[docs] def inputs(self): return self._inputs
[docs] def outputs(self): if len(self._outputs) == 1: return self._outputs[0] else: return tuple(self._outputs)
[docs] def to_proto(self, indices): e = self._db.protobufs.Op() e.name = self._name e.device_type = DeviceType.to_proto(self._db.protobufs, self._device) e.stencil.extend(self._stencil) e.batch = self._batch e.warmup = self._warmup if e.name == "Input": inp = e.inputs.add() inp.column = self._inputs[0]._col inp.op_index = -1 else: for i in self._inputs: inp = e.inputs.add() idx = indices[i._op] if i._op is not None else -1 inp.op_index = idx inp.column = i._col if isinstance(self._args, dict): if self._name in self._db._python_ops: e.kernel_args = pickle.dumps(self._args) elif len(self._args) > 0: # To convert an arguments dict, we search for a protobuf with the # name {Op}Args (e.g. BlurArgs, HistogramArgs) in the # args.proto module, and fill that in with keys from the args dict. op_info = self._db._get_op_info(self._name) if len(op_info.protobuf_name) > 0: proto_name = op_info.protobuf_name e.kernel_args = python_to_proto(self._db.protobufs, proto_name, self._args) else: e.kernel_args = self._args else: # If arguments are a protobuf object, serialize it directly e.kernel_args = self._args.SerializeToString() return e
[docs]def register_python_op(name: str = None, stencil: List[int] = None, unbounded_state: bool = False, bounded_state: int = None, device_type: DeviceType = None, device_sets: List[Tuple[DeviceType, int]] = None, batch: int = 1, proto_path: str = None): r"""Class or function decorator which registers a new Op and Kernel with the Scanner master. Parameters ---------- name Optional name for the Op. By default, it will be inferred as the name of the decorated class/kernel. stencil Specifies the default stencil to use for the Op. If none, indicates that the the Op does not have the ability to stencil. A stencil of [0] should be specified if the Op can stencil but should not by default. unbounded_state If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with `bounded_state`. bounded_state If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with `bounded_state`. device_type device_sets batch proto_path Optional path to the proto file that describes the configuration arguments to this Op. """ def dec(fn_or_class): is_fn = False if isinstance(fn_or_class, types.FunctionType) or isinstance( fn_or_class, types.BuiltinFunctionType): is_fn = True if name is None: # Infer name from fn_or_class name kname = fn_or_class.__name__ else: kname = name can_stencil = stencil is not None can_batch = batch > 1 # Get execute function to determine input and output types if is_fn: exec_fn = fn_or_class else: exec_fn = getattr(fn_or_class, "execute", None) if not callable(exec_fn): raise ScannerException( ('Attempted to register Python Op with name {:s}, but that ' 'provided class has no "execute" method.').format(kname)) input_columns = [] has_variadic_inputs = False sig = signature(exec_fn) fn_params = sig.parameters if is_fn: # If this is a fn kernel, then the first argument should be `config` fn_params = OrderedDict(islice(fn_params.items(), 1, None)) else: # If this is a class kernel, then first argument should be self fn_params = OrderedDict(islice(fn_params.items(), 1, None)) def parse_annotation_to_column_type(typ, is_input=False): if can_batch: # If the op can batch, then we expect the types to be # Sequence[T], where T = {bytes, FrameType} if (not getattr(typ, '__origin__', None) or typ.__origin__ != Sequence): raise ScannerException( ('A batched Op must specify a "Sequence" type ' 'annotation for each input and output.')) typ = typ.__args__[0] if is_input and can_stencil: # If the op can stencil, then we expect the input types to be # Sequence[T], where T = {bytes, FrameType} if (not getattr(typ, '__origin__', None) or typ.__origin__ != Sequence): raise ScannerException( ('A stenciled Op must specify a "Sequence" type ' 'annotation for each input. If the Op both stencils ' 'and batches, then it should have the type ' '"Sequence[Sequence[T]], where T = {bytes, FrameType}.' )) typ = typ.__args__[0] if typ == bytes: column_type = ColumnType.Blob elif typ == FrameType: column_type = ColumnType.Video else: raise ScannerException( ('Invalid type annotation specified for input {:s}. Must ' 'specify an annotation of type "bytes" or ' '"FrameType".').format(param_name)) return column_type # Analyze exec_fn parameters to determine the input types for param_name, param in fn_params.items(): # We only allow keyword arguments and *args. # There is no support currently for positional or **kwargs kind = param.kind if (kind == param.POSITIONAL_ONLY or kind == param.VAR_KEYWORD): raise ScannerException( ('Positional arguments and **kwargs are currently not ' 'supported for the "execute" method of kernels')) if kind == param.VAR_POSITIONAL: # This means we have variadic inputs has_variadic_inputs = True if len(fn_params) > 1: raise ScannerException( ('Variadic positional inputs (*args) are not supported ' 'when used with other inputs.')) break if param.annotation == param.empty: raise ScannerException( ('No type annotation specified for input {:s}. Must ' 'specify an annotation of "bytes" or "FrameType".') .format(param_name)) typ = param.annotation column_type = parse_annotation_to_column_type(typ, is_input=True) input_columns.append((param_name, column_type)) output_columns = [] # Analyze exec_fn return type to determine output types typ = sig.return_annotation if typ == sig.empty: raise ScannerException( ('Return annotation must be specified for "execute" method.')) return_is_tuple = True if getattr(typ, '__origin__', None) == Tuple: if getattr(typ, '__tuple_params__', None): # Python 3.5 use_ellipsis = typ.__tuple_use_ellipsis__ tuple_params = typ.__tuple_params__ elif getattr(typ, '__args__', None): # Python 3.6+ use_ellipsis = typ.__args__[-1] is Ellipsis tuple_params = typ.__args__[:-1 if use_ellipsis else None] else: raise ScannerException('This should not happen...') else: use_ellipsis = False return_is_tuple = False tuple_params = [typ] if use_ellipsis: raise ScannerException( ('Ellipsis tuples not supported for return type.')) # Parse the return types into Scanner column types for i, typ in enumerate(tuple_params): column_type = parse_annotation_to_column_type(typ) output_columns.append(('ret{:d}'.format(i), column_type)) if kname in PYTHON_OP_REGISTRY: raise ScannerException( 'Attempted to register Op with name {:s} twice'.format(kname)) def parse_ret(r): if return_is_tuple: return r else: return (r, ) # Wrap exec_fn to destructure input and outputs to proper python inputs if is_fn: if has_variadic_inputs: @wraps(fn_or_class) def wrapper_exec(config, in_cols): return parse_ret(exec_fn(config, *in_cols)) else: @wraps(fn_or_class) def wrapper_exec(config, in_cols): args = {} for (param_name, _), c in zip(input_columns, in_cols): args[param_name] = c return parse_ret(exec_fn(config, **args)) wrapped_fn_or_class = wrapper_exec else: wrapped_fn_or_class = type(fn_or_class.__name__ + 'Kernel', fn_or_class.__bases__, dict(fn_or_class.__dict__)) if has_variadic_inputs: def execute(self, in_cols): return parse_ret(exec_fn(self, *in_cols)) else: def execute(self, in_cols): args = {} for (param_name, _), c in zip(input_columns, in_cols): args[param_name] = c return parse_ret(exec_fn(self, **args)) wrapped_fn_or_class.execute = execute dtype = device_type if device_type is None and device_sets is None: dtype = DeviceType.CPU if device_type is not None and device_sets is not None: raise ScannerException( 'Must only specify one of "device_type" or "device_sets" for python Op.') PYTHON_OP_REGISTRY[kname] = { 'input_columns': input_columns, 'output_columns': output_columns, 'variadic_inputs': has_variadic_inputs, 'stencil': stencil, 'unbounded_state': unbounded_state, 'bounded_state': bounded_state, 'kernel': wrapped_fn_or_class, 'device_type': dtype, 'device_sets': device_sets, 'batch': batch, 'proto_path': proto_path, 'registration_id': uuid.uuid4().hex } return fn_or_class return dec