Scanner Python API

scannerpy.client module

class scannerpy.client.Client(master=None, workers=None, start_cluster=True, config_path=None, config=None, debug=None, enable_watchdog=True, prefetch_table_metadata=True, no_workers_timeout=30, grpc_timeout=30, new_job_retries_limit=5, machine_params=None)[source]

Bases: object

Entrypoint for all Scanner operations.

Parameters
  • master (Optional[str]) – The address of the master process. The addresses should be formatted as ‘ip:port’. If the start_cluster flag is specified, the Client object will ssh into the provided address and start a master process. You should have ssh access to the target machine and scannerpy should be installed.

  • workers (Optional[List[str]]) – The list of addresses to spawn worker processes on. The addresses should be formatted as ‘ip:port’. Like with master, you should have ssh access to the target machine and scannerpy should be installed. If start_cluster is false, this parameter has no effect.

  • start_cluster (bool) – If true, a master process and worker processes will be spawned at the addresses specified by master and workers, respectively.

  • config_path (Optional[str]) – Path to a Scanner configuration TOML, by default assumed to be ‘~/.scanner/config.toml’.

  • config (Optional[Config]) – The scanner Config to use. If specified, config_path is ignored.

  • debug (Optional[bool]) – This flag is only relevant when start_cluster == True. If true, the master and worker servers are spawned in the same process as the invoking python code, enabling easy gdb-based debugging.

Other Parameters
  • prefetch_table_metadata

  • no_workers_timeout

  • grpc_timeout

Variables
  • config (Config) – The Config object used to initialize this Client.

  • ops (OpGenerator) –

    Represents the set of available Ops. Ops can be created like so:

    output = cl.ops.ExampleOp(arg='example')

    For a more detailed description, see OpGenerator

  • sources (SourceGenerator) – Represents the set of available Sources. Sources are created just like Ops. See SourceGenerator

  • sinks (SinkGenerator) – Represents the set of available Sinks. Sinks are created just like Ops. See SinkGenerator

  • streams (StreamsGenerator) – Used to specify which elements to sample from a sequence. See StreamsGenerator

  • partitioner (TaskPartitioner) – Used to specify how to split the elements in a sequence when performing a slice operation. See TaskPartitioner.

  • protobufs (ProtobufGenerator) – Used to construct protobuf objects that handle serialization/deserialization of the outputs of Ops.

batch_load(tables, column, callback, batch_size=1000, workers=16)[source]
bulk_fetch_video_metadata(tables)[source]
delete_table(name)[source]

Deletes a table from the database.

Parameters

name (str) – The name of the table to delete.

delete_tables(names)[source]

Deletes tables from the database.

Parameters

names (List[str]) – The names of the tables to delete.

get_active_jobs()[source]
get_profile(job_name, **kwargs)[source]
has_gpu()[source]
has_table(name)[source]

Checks if a table exists in the database.

Parameters

name (str) – The name of the table to check for.

Returns

True if the table exists, false otherwise.

Return type

bool

ingest_videos(videos, inplace=False, force=False)[source]

Creates tables from videos.

Parameters
  • videos (List[Tuple[str, str]]) – The list of videos to ingest into the client. Each element in the list should be (‘table_name’, ‘path/to/video’).

  • inplace (bool) – If true, ingests the videos without copying them into the client. Currently only supported for mp4 containers.

  • force (bool) – If true, deletes existing tables with the same names.

Return type

Tuple[List[Table], List[Tuple[str, str]]]

Returns

  • tables (List[Table]) – List of table objects for the ingested videos.

  • failures (List[Tuple[str, str]]) – List of (‘path/to/video’, ‘reason for failure’) tuples for each video which failed to ingest.

load_op(so_path, proto_path=None)[source]

Loads a custom op into the Scanner runtime.

Parameters
  • so_path (str) – Path to the custom op’s shared library (.so).

  • proto_path (Optional[str]) – Path to the custom op’s arguments protobuf if one exists.

Raises

ScannerException – Raised when the master fails to load the op.

new_table(name, columns, rows, fns=None, force=False)[source]

Creates a new table from a list of rows.

Parameters
  • name (str) – String name of the table to create

  • columns (List[str]) – List of names of table columns

  • rows (List[List[bytes]]) – List of rows with each row a list of elements corresponding to the specified columns. Elements must be strings of serialized representations of the data.

  • fns

  • force (bool) –

Returns

The new table object.

Return type

Table

register_op(name, input_columns, output_columns, variadic_inputs=False, stencil=None, unbounded_state=False, bounded_state=None, proto_path=None)[source]

Register a new Op with the Scanner master.

Parameters
  • name (str) – Name of the Op.

  • input_columns (List[Union[str, Tuple[str, ColumnType]]]) – A list of the inputs for this Op. Can be either the name of the input as a string or a tuple of (‘name’, ColumnType). If only the name is specified as a string, the ColumnType is assumed to be ColumnType.Blob.

  • output_columns (List[Union[str, Tuple[str, ColumnType]]]) – A list of the outputs for this Op. Can be either the name of the output as a string or a tuple of (‘name’, ColumnType). If only the name is specified as a string, the ColumnType is assumed to be ColumnType.Blob.

  • variadic_inputs (bool) – If true, this Op may take a variable number of inputs and input_columns is ignored. Variadic inputs are specified as positional arguments when invoking the Op, instead of keyword arguments.

  • stencil (Optional[List[int]]) – Specifies the default stencil to use for the Op. If none, indicates that the the Op does not have the ability to stencil. A stencil of [0] should be specified if the Op can stencil but should not by default.

  • unbounded_state (bool) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.

  • bounded_state (Optional[int]) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.

  • proto_path (Optional[str]) – Optional path to the proto file that describes the configuration arguments to this Op.

Raises

ScannerException – Raised when the master fails to register the Op.

register_python_kernel(op_name, device_type, kernel, batch=1)[source]

Register a Python Kernel with the Scanner master.

Parameters
  • op_name (str) – Name of the Op.

  • device_type (DeviceType) – The device type of the resource this kernel uses.

  • kernel (Union[function, builtin_function_or_method, Kernel]) – The class or function that implements the kernel.

  • batch (int) – Specifies a default for how many elements this kernel should batch over. If batch == 1, kernel is assume to not be able to batch.

Raises

ScannerException – Raised when the master fails to register the kernel.

run(outputs, perf_params, cache_mode=<CacheMode.Error: 1>, show_progress=True, profiling=False, task_timeout=0, checkpoint_frequency=10, detach=False, profiler_level=1)[source]

Runs a collection of jobs.

Parameters
  • outputs (Union[Sink, List[Sink]]) – The Sink or Sinks that should be processed.

  • perf_params (Callable[[], PerfParams]) – Performance-related parameters. These options should be tuned to improve the performance of executing a computation graph.

  • cache_mode (CacheMode) – Determines whether to overwrite, ignore, or raise an error when running a job on existing outputs.

  • show_progress (bool) – If true, will display an ASCII progress bar measuring job status.

  • profiling (bool) –

Other Parameters
  • task_timeout

  • checkpoint_frequency

Returns

The job id.

Return type

int

sequence(name)[source]
Return type

Column

start_master(master)[source]

Starts a Scanner master.

Parameters

master (str) – ssh-able address of the master node.

start_workers(workers)[source]

Starts Scanner workers.

Parameters

workers (List[str]) – list of ssh-able addresses of the worker nodes.

stop_cluster()[source]

Stops the Scanner master and workers.

summarize()[source]

Returns a human-readable summarization of the client state.

Return type

str

table(name)[source]

Retrieves a Table.

Parameters

name (str) – Name of the table to retrieve.

Returns

The table object.

Return type

Table

wait_on_job(*args, **kwargs)[source]
wait_on_job_gen(bulk_job_id, show_progress=True)[source]
scannerpy.client.start_master(port=None, config=None, config_path=None, block=False, watchdog=True, no_workers_timeout=30, new_job_retries_limit=5)[source]

Start a master server instance on this node.

Parameters
  • port (Optional[int]) – The port number to start the master on. If unspecified, it will be read from the provided Config.

  • config (Optional[Config]) – The scanner Config to use. If specified, config_path is ignored.

  • config_path (optional) – Path to a Scanner configuration TOML, by default assumed to be ~/.scanner/config.toml.

  • block (optional) – If true, will wait until the server is shutdown. Server will not shutdown currently unless wait_for_server_shutdown is eventually called.

  • watchdog (optional) – If true, the master will shutdown after a time interval if PokeWatchdog is not called.

  • no_workers_timeout (optional) – The interval after which the master will consider a job to have failed if it has no workers connected to it.

Returns

A cpp database instance.

Return type

Database

scannerpy.client.start_worker(master_address, machine_params=None, port=None, config=None, config_path=None, block=False, watchdog=True, db=None)[source]

Starts a worker instance on this node.

Parameters
  • master_address (str) – The address of the master server to connect this worker to. The expected format is ‘0.0.0.0:5000’ (ip:port).

  • machine_params – Describes the resources of the machine that the worker should manage. If left unspecified, the machine resources will be inferred.

  • config (Optional[Config]) – The Config object to use in creating the worker. If specified, config_path is ignored.

  • config_path (Optional[str]) – Path to a Scanner configuration TOML, by default assumed to be ~/.scanner/config.toml.

  • block (bool) – If true, will wait until the server is shutdown. Server will not shutdown currently unless wait_for_server_shutdown is eventually called.

  • watchdog (bool) – If true, the worker will shutdown after a time interval if PokeWatchdog is not called.

Other Parameters

db – This is for internal usage only.

Returns

A cpp database instance.

Return type

Database

scannerpy.config module

class scannerpy.config.Config(config_path=None, db_path=None)[source]

Bases: object

static default_config()[source]
static default_config_path()[source]
scannerpy.config.mkdir_p(path)[source]
scannerpy.config.read_line(s)[source]

scannerpy.op module

class scannerpy.op.Op(sc, name, inputs, device, batch=-1, warmup=-1, stencil=[0], args={}, extra=None, stream_args=None)[source]

Bases: object

inputs()[source]
outputs()[source]
to_proto(indices)[source]
class scannerpy.op.OpColumn(sc, op, col, typ)[source]

Bases: object

compress(codec='video', **kwargs)[source]
compress_default()[source]
compress_video(quality=-1, bitrate=-1, keyframe_distance=-1)[source]
lossless()[source]
class scannerpy.op.OpGenerator(sc)[source]

Bases: object

Creates Op instances to define a computation.

When a particular op is requested from the generator, e.g. sc.ops.Histogram, the generator does a dynamic lookup for the op in a C++ registry.

class scannerpy.op.SliceList[source]

Bases: list

List of per-stream arguments to each slice of an op.

scannerpy.op.check_modules(sc)[source]
scannerpy.op.collect_per_stream_args(name, protobuf_name, kwargs)[source]
scannerpy.op.register_module(so_path, proto_path=None)[source]
scannerpy.op.register_python_op(name=None, stencil=None, unbounded_state=False, bounded_state=None, device_type=None, device_sets=None, batch=1, proto_path=None)[source]

Class or function decorator which registers a new Op and Kernel with the Scanner master.

Parameters
  • name (Optional[str]) – Optional name for the Op. By default, it will be inferred as the name of the decorated class/kernel.

  • stencil (Optional[List[int]]) – Specifies the default stencil to use for the Op. If none, indicates that the the Op does not have the ability to stencil. A stencil of [0] should be specified if the Op can stencil but should not by default.

  • unbounded_state (bool) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.

  • bounded_state (Optional[int]) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.

  • device_type (Optional[DeviceType]) –

  • device_sets (Optional[List[Tuple[DeviceType, int]]]) –

  • batch (int) –

  • proto_path (Optional[str]) – Optional path to the proto file that describes the configuration arguments to this Op.

scannerpy.types module

class scannerpy.types.Bbox

Bases: object

deserialize()
serialize()
class scannerpy.types.BboxList

Bases: object

deserialize()
serialize()
class scannerpy.types.FrameType[source]

Bases: object

class scannerpy.types.Histogram

Bases: object

deserialize()
serialize()
class scannerpy.types.Image[source]

Bases: object

deserialize()[source]
serialize()[source]
class scannerpy.types.NumpyArrayFloat32[source]

Bases: object

deserialize()[source]
serialize()[source]
class scannerpy.types.NumpyArrayInt32[source]

Bases: object

deserialize()[source]
serialize()[source]
scannerpy.types.ProtobufType(name, proto)[source]
class scannerpy.types.ScannerTypeInfo(type, cpp_name, serialize, deserialize)[source]

Bases: object

scannerpy.types.UniformList(name, typ, size=None, parts=None)[source]
scannerpy.types.VariableList(name, typ)[source]
scannerpy.types.get_type_info(ty)[source]
scannerpy.types.get_type_info_cpp(cpp_name)[source]
scannerpy.types.register_type(cls)[source]

scannerpy.io module

class scannerpy.io.IOGenerator(sc)[source]

Bases: object

Input(streams)[source]
Output(op, streams)[source]

scannerpy.streams module

class scannerpy.streams.StreamsGenerator(sc)[source]

Bases: object

Provides Ops for sampling elements from streams.

The methods of this class construct Scanner Ops that enable selecting subsets of the elements in a stream to produce new streams.

This class should not be constructed directly, but accessed via a Database object like:

sc.streams.Range(input)

All(input)[source]

Samples all elements from the stream.

Serves as an identity sampling function.

Parameters

input (OpColumn) – The stream to sample.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Gather(input, indices)[source]

Samples a list of elements from the input stream.

Parameters
  • input (OpColumn) – The stream to sample.

  • rows – A list of the indices to sample.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Range(input, ranges)[source]

Samples a range of elements from the input stream.

Parameters
  • input (OpColumn) – The stream to sample.

  • ranges (Sequence[Tuple[int]]) – Pairs of (start, end) for each stream being processed.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Examples

For example, to select frames 0-10 for one stream, you would write:

sc.streams.Ranges(input=input, ranges=[(0, 11)])

Ranges(input, intervals)[source]

Samples multiple ranges of elements from the input stream.

Parameters
  • input (OpColumn) – The stream to sample.

  • intervals (List[Sequence[Tuple[int, int]]]) – The intervals to sample from. This should be a list of tuples representing start and end ranges.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Examples

For example, to select frames 0-10 and 100-200 for one stream, you would write:

sc.streams.Ranges(input=input, intervals=[[(0, 11), (100, 201)]])

Repeat(input, spacings)[source]

Expands a sequence by repeating elements.

Parameters
  • input (OpColumn) – The stream to expand.

  • spacing

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

RepeatNull(input, spacings)[source]

Expands a sequence by inserting nulls.

Parameters
  • input (OpColumn) – The stream to expand.

  • spacing

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Slice(input, partitions)[source]

Partitions a stream into independent substreams.

Parameters
  • input (OpColumn) – The stream to partition.

  • partitioner – The partitioner that should be used to split the stream into substreams.

Returns

A new stream which represents multiple substreams.

Return type

scannerpy.op.OpColumn

Stride(input, strides)[source]

Samples every n’th element from the stream, where n is the stride.

Parameters
  • input (OpColumn) – The stream to sample.

  • strides – The list of strides for each input stream.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

StridedRange(input, ranges)[source]

Samples a strided range of elements from the input stream.

Parameters
  • input (OpColumn) – The stream to sample.

  • start – The default index to start sampling from.

  • end – The default index to end sampling at.

  • stride – The default value to stride by.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

StridedRanges(input, intervals=None, stride=None)[source]

Samples strided ranges of elements from the input stream.

Parameters
  • input (OpColumn) – The stream to sample.

  • intervals (Optional[Sequence[Tuple[int, int]]]) – The default intervals to sample from. This should be a list of tuples representing start and end ranges.

  • stride (Optional[int]) – The default value to stride by.

Returns

The sampled stream.

Return type

scannerpy.op.OpColumn

Unslice(input)[source]

Joins substreams back together.

Parameters

input (OpColumn) – The stream which contains substreams to join back together.

Returns

A new stream which is the concatentation of the input substreams.

Return type

scannerpy.op.OpColumn

scannerpy.source module

class scannerpy.source.Source(sc, name, enumerator_args, source_args={})[source]

Bases: object

outputs()[source]
to_proto(indices)[source]
class scannerpy.source.SourceGenerator(sc)[source]

Bases: object

Creates Source instances to define a computation.

When a particular Source is requested from the generator, e.g. sc.source.Sequence, the generator does a dynamic lookup for the Source in the servers registry.

scannerpy.sink module

class scannerpy.sink.Sink(sc, name, inputs, job_args, sink_args={})[source]

Bases: object

inputs()[source]
to_proto(indices)[source]
class scannerpy.sink.SinkGenerator(sc)[source]

Bases: object

Creates Sink instances to define a computation.

When a particular Sink is requested from the generator, e.g. sc.sink.Column, the generator does a dynamic lookup for the Sink in the servers registry.

scannerpy.job module

class scannerpy.job.Job(op_args)[source]

Bases: object

A specification of a table to produce as output of a bulk job.

op_args()[source]

scannerpy.column module

class scannerpy.column.Column(table, name)[source]

Bases: object

A column of a Table.

id()[source]
keyframes()[source]
load(ty=None, fn=None, rows=None, workers=16)[source]

Loads the results of a Scanner computation into Python.

Kwargs:
fn: Optional function to apply to the binary blobs as they are read

in.

Returns

Generator that yields either a numpy array for frame columns or a binary blob for non-frame columns (optionally processed by the fn).

name()[source]
save_mp4(output_name, fps=None, scale=None)[source]
type()[source]

scannerpy.common module

class scannerpy.common.CacheMode[source]

Bases: enum.Enum

An enumeration.

Error = 1
Ignore = 2
Overwrite = 3
class scannerpy.common.ColumnType[source]

Bases: enum.Enum

Enum for specifying what the type of a column is.

Blob = 0
Video = 1
to_proto = <function ColumnType.to_proto>[source]
class scannerpy.common.CustomFormatter(fmt=None, datefmt=None, style='%')[source]

Bases: logging.Formatter

Initialize the formatter with specified format strings.

Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument (if omitted, you get the ISO8601 format).

Use a style parameter of ‘%’, ‘{‘ or ‘$’ to specify that you want to use one of %-formatting, str.format() ({}) formatting or string.Template formatting in your format string.

Changed in version 3.2: Added the style parameter.

format(record)[source]

Format the specified record as text.

The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.

class scannerpy.common.DeviceHandle(device, device_id)[source]

Bases: object

class scannerpy.common.DeviceType[source]

Bases: enum.Enum

Enum for specifying where an Op should run.

CPU = 0
GPU = 1
to_proto = <function DeviceType.to_proto>[source]
class scannerpy.common.PerfParams(work_packet_size, io_packet_size, cpu_pool=None, gpu_pool=None, pipeline_instances_per_node=None, load_sparsity_threshold=8, queue_size_per_pipeline=4)[source]

Bases: object

Parameters
  • work_packet_size (int) – The size of the packets of intermediate elements to pass between operations. This parameter only affects performance and should not affect the output.

  • io_packet_size (int) – The size of the packets of elements to read and write from Sources and sinks. This parameter only affects performance and should not affect the output. When reading and writing to high latency storage (such as the cloud), it is helpful to increase this value.

  • cpu_pool (Optional[str]) – A string describing the size of the CPU memory pool to initialize. If none, no memory pool is used.

  • gpu_pool (Optional[str]) – A string describing the size of the GPU memory pool to initialize. If none, no memory pool is used.

  • pipeline_instances_per_node (Optional[int]) – The number of concurrent instances of the computation graph to execute. If set to None, it will be automatically inferred based on computation graph and the available machine resources.

  • load_sparsity_threshold (int) –

  • queue_size_per_pipeline (int) – The max number of tasks that a worker will request from the master for each pipeline instance. This influences the amount of data that can will be resident in memory at once.

classmethod estimate(max_memory_util=0.7, total_memory=None, work_io_ratio=0.2, queue_size_per_pipeline=4, **kwargs)[source]

Guess the best value of each performance parameters given the computation graph.

Parameters
  • max_memory_util (float) – Target maximum memory utilization as a fraction of the total system memory, e.g. 0.5 means Scanner should try to use 50% of the machine’s memory.

  • total_memory (Optional[int]) – Total memory on the worker machines in bytes. Memory of the current machine will be used if none is is provided.

  • work_io_ratio (float) – Ratio of work_packet_size to io_packet_size.

  • queue_size_per_pipeline (int) – The max number of tasks potentially resident for each pipeline on a worker.

classmethod manual(work_packet_size, io_packet_size, **kwargs)[source]

Explicitly provide values for each performance parameter.

See class definition for explanation of each parameter.

Parameters
  • work_packet_size

  • io_packet_size

exception scannerpy.common.ScannerException[source]

Bases: Exception

scannerpy.kernel module

class scannerpy.kernel.Kernel(config, init_parameter=None)[source]

Bases: object

Parameters
  • config (KernelConfig) – Contains the configuration settings for this instance of the kernel.

  • init_parameter

    An example init parameter. Any parameters defined in the __init__ method of a Kernel can be set when creating an instance of the corresponding operation. For example, an operation for this Kernel could be initialized like this:

    cl.ops.Kernel(init_parameter='test', ...)

close()[source]

Called when this Kernel instance will no longer be used.

execute(stream_parameter)[source]

Runs the kernel on input elements and returns new output elements.

Parameters

stream_parameter (bytes) – An example stream parameter. Must be annotated with a stream parameter type. See Stream Parameters.

Returns

The outputs for the operation.

Return type

bytes

fetch_resources()[source]

Runs once per Scanner worker to download resources for running this operation.

new_stream()[source]

Runs after fetch_resources for each instance of this operation.

Parameters added for this method by operations are considered stream config parameters (see Stream Config Parameters).

reset()[source]

Called for stateful operations when the operation should reset its logical state.

setup_with_resources()[source]

Runs after fetch_resources for each instance of this operation.

This method is reponsible for handling any setup that requires resources to first be downloaded.

class scannerpy.kernel.KernelConfig(config)[source]

Bases: object

scannerpy.kernel.python_kernel_fn(n, recv_conn, send_conn, p_conn1, p_conn2)[source]

scannerpy.partitioner module

class scannerpy.partitioner.TaskPartitioner(sc)[source]

Bases: object

Utility for specifying how to partition the output domain of a job into tasks.

all(group_size=250)[source]
gather(groups)[source]
range(start, end)[source]
ranges(intervals)[source]
strided(stride, group_size=250)[source]
strided_range(start, end, stride)[source]
strided_ranges(intervals, stride)[source]

scannerpy.profiler module

class scannerpy.profiler.Profile(sc, job_id, load_threads=8, subsample=None)[source]

Bases: object

Contains profiling information about Scanner jobs.

statistics()[source]
total_time_interval()[source]
write_trace(path)[source]

Generates a trace file in Chrome format.

To visualize the trace, visit chrome://tracing in Google Chrome and click “Load” in the top left to load the trace.

Parameters

path (str) – Output path to write the trace.

scannerpy.profiler.read_advance(fmt, buf, offset)[source]
scannerpy.profiler.unpack_string(buf, offset)[source]

scannerpy.protobuf_generator module

scannerpy.sampler module

class scannerpy.sampler.Sampler(sc)[source]

Bases: object

Utility for specifying which frames of a video (or which rows of a table) to run a computation over.

All(input)[source]
Gather(input, rows=None)[source]
Range(input, start=None, end=None)[source]
Ranges(input, intervals=None)[source]
Repeat(input, spacing=None)[source]
RepeatNull(input, spacing=None)[source]
Stride(input, stride=None)[source]
StridedRange(input, start=None, end=None, stride=None)[source]
StridedRanges(input, intervals=None, stride=None)[source]

scannerpy.storage module

class scannerpy.storage.NamedStorage(storage_config=None)[source]

Bases: scannerpy.storage.StorageBackend

Named storage for byte streams. Useful default output format for non-video-data.

Stores byte streams in a custom packed binary file format. Supports both local filesystems (Linux/Posix) and cloud file systems (S3, GCS).

delete(sc, streams)[source]

Deletes the streams from storage if they exist.

Parameters
  • sc (Client) – Scanner client

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

sink(sc, op, streams)[source]

Returns the Scanner sink corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • op (Op) – Input op from computation graph

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

sink – Scanner sink

Return type

Sink

source(sc, streams)[source]

Returns the Scanner source corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

source – Scanner source

Return type

Source

class scannerpy.storage.NamedStream(sc, name, storage=None)[source]

Bases: scannerpy.storage.StoredStream

Stream of elements stored on disk with a given name.

Parameters
  • sc (Client) – Scanner client.

  • name (str) – Name of the stream.

  • storage (NamedStorage) – Optional NamedStorage object.

committed()[source]

Check if a stream is completely materialized in storage.

A stream may exist, but not be committed if a Scanner job was run that was supposed to output the stream, but the job failed for any reason.

exists()[source]

Check if any part of a stream exists in storage.

len()[source]

Get the number of elements in this stream.

load_bytes(rows=None)[source]

A generator that incrementally loads raw bytes from the stored stream.

This function is not intended to be called directly by the user. See load().

Parameters

rows (List[int]) – List of indices in the stream to load. Default is all elements.

name()[source]

Gets a human interpretable name for this stored stream.

storage()[source]

Get the storage backend corresponding to this stream.

type()[source]

Get the Scanner type of elements in this stream if it exists, and return None otherwise.

class scannerpy.storage.NamedVideoStorage(storage_config=None)[source]

Bases: scannerpy.storage.NamedStorage

Named storage for video streams. Special baked-in storage class for keeping videos compressed.

ingest(sc, streams, batch=500)[source]
sink(sc, op, streams)[source]

Returns the Scanner sink corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • op (Op) – Input op from computation graph

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

sink – Scanner sink

Return type

Sink

source(sc, streams)[source]

Returns the Scanner source corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

source – Scanner source

Return type

Source

class scannerpy.storage.NamedVideoStream(sc, name, path=None, inplace=False, storage=None)[source]

Bases: scannerpy.storage.NamedStream

Stream of video frame stored (compressed) in named storage.

Parameters
  • sc (Client) – Scanner client

  • name (str) – Name of the stream in Scanner storage.

  • path (str) – Path to corresponding video file. If this parameter is given, the named video stream will be created from the video at the given path.

  • storage (NamedFrameStorage) – Optional NamedFrameStorage object.

as_hwang()[source]
committed()[source]

Check if a stream is completely materialized in storage.

A stream may exist, but not be committed if a Scanner job was run that was supposed to output the stream, but the job failed for any reason.

estimate_size()[source]

Estimates the size in bytes of elements in the stream. Not guaranteed to be accurate, just used for heuristics.

exists()[source]

Check if any part of a stream exists in storage.

len()[source]

Get the number of elements in this stream.

load_bytes(rows=None)[source]

A generator that incrementally loads raw bytes from the stored stream.

This function is not intended to be called directly by the user. See load().

Parameters

rows (List[int]) – List of indices in the stream to load. Default is all elements.

save_mp4(output_name, fps=None, scale=None)[source]
class scannerpy.storage.NullElement[source]

Bases: object

Represents a ‘null’ output in a stream generated through Scanner.

These can show up e.g. when you use a spacing/repeat operator. NullElement is used instead of the more generic Python ‘None’, as a Python kernel can have ‘None’ as a valid output which isn’t the same as a ‘null’ stream element.

class scannerpy.storage.StorageBackend[source]

Bases: object

I/O backend for streams fed in/out of Scanner.

delete(sc, streams)[source]

Deletes the streams from storage if they exist.

Parameters
  • sc (Client) – Scanner client

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

sink(sc, op, streams)[source]

Returns the Scanner sink corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • op (Op) – Input op from computation graph

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

sink – Scanner sink

Return type

Sink

source(sc, streams)[source]

Returns the Scanner source corresponding to this storage backend.

Parameters
  • sc (Client) – Scanner client

  • streams (List[StoredStream]) – List of StoredStream objects of the same storage type

Returns

source – Scanner source

Return type

Source

class scannerpy.storage.StoredStream[source]

Bases: object

Handle to a stream stored in a particular storage backend.

In general, a StoredStream is not guaranteed to exist, but you can always check this using exists().

committed()[source]

Check if a stream is completely materialized in storage.

A stream may exist, but not be committed if a Scanner job was run that was supposed to output the stream, but the job failed for any reason.

Return type

bool

delete(sc)[source]

Deletes the stream from its storage if it exists.

Parameters

sc (Client) – Scanner client

estimate_size()[source]

Estimates the size in bytes of elements in the stream. Not guaranteed to be accurate, just used for heuristics.

Return type

int

exists()[source]

Check if any part of a stream exists in storage.

Return type

bool

len()[source]

Get the number of elements in this stream.

Return type

int

load(ty=None, fn=None, rows=None)[source]

Load elements from the stored stream into Python.

Parameters
  • ty (type) – Scanner type of elements in the stream. If the storage backend recorded the type, that will be used by default.

  • fn (Callable[[bytes], Any]) – Deserialization function that maps elements as bytes to their actual type.

  • rows (List[int]) – List of indices in the stream to load. Default is all elements.

Returns

generator – A generator that outputs one stream element at a time.

Return type

Generator[Any, None, None]

load_bytes(rows=None)[source]

A generator that incrementally loads raw bytes from the stored stream.

This function is not intended to be called directly by the user. See load().

Parameters

rows (List[int]) – List of indices in the stream to load. Default is all elements.

Return type

Generator[bytes, None, None]

name()[source]

Gets a human interpretable name for this stored stream.

Return type

str

storage()[source]

Get the storage backend corresponding to this stream.

Return type

StorageBackend

type()[source]

Get the Scanner type of elements in this stream if it exists, and return None otherwise.

Return type

type

scannerpy.kube module

class scannerpy.kube.CloudConfig(project, service_key=NOTHING, storage_key_id=NOTHING, storage_key_secret=NOTHING)[source]

Bases: object

class scannerpy.kube.Cluster(cloud_config, cluster_config, no_start=False, no_delete=False, containers=None)[source]

Bases: object

cli()[source]
client(retries=3, **kwargs)[source]
config()[source]
create_object(template)[source]
delete(prompt=False)[source]
get_by_owner(ty, owner, namespace='default')[source]
get_credentials()[source]
get_kube_info(kind, namespace='default')[source]
get_object(info, name)[source]
get_pod(deployment, namespace='default')[source]
healthy()[source]
job_status()[source]
make_container(name, machine_config)[source]
make_deployment(name, machine_config, replicas)[source]
master_address()[source]
master_logs(previous=False)[source]
monitor(sc)[source]
resize(size)[source]
resource_metrics()[source]
running(pool='default-pool')[source]
start(reset=True, wait=True)[source]
trace(path, subsample=None, job=None)[source]
worker_logs(n, previous=False)[source]
class scannerpy.kube.ClusterConfig(id, num_workers, master, worker, zone='us-east1-b', kube_version='latest', num_load_workers=8, num_save_workers=8, autoscale=True, no_workers_timeout=600, scopes=frozenset({'https://www.googleapis.com/auth/compute', 'https://www.googleapis.com/auth/devstorage.read_write', 'https://www.googleapis.com/auth/logging.write', 'https://www.googleapis.com/auth/monitoring', 'https://www.googleapis.com/auth/pubsub', 'https://www.googleapis.com/auth/service.management.readonly', 'https://www.googleapis.com/auth/servicecontrol', 'https://www.googleapis.com/auth/trace.append'}), scanner_config='/root/.scanner/config.toml', pipelines=frozenset({}))[source]

Bases: object

price(no_master=False)[source]
class scannerpy.kube.MachineConfig(image, type, disk, gpu=0, gpu_type='nvidia-tesla-p100', preemptible=False)[source]

Bases: object

price()[source]
class scannerpy.kube.MachineType[source]

Bases: abc.ABC

get_cpu()[source]
get_mem()[source]
get_name()[source]
class scannerpy.kube.MachineTypeName(name)[source]

Bases: scannerpy.kube.MachineType

get_cpu()[source]
get_mem()[source]
get_name()[source]
class scannerpy.kube.MachineTypeQuantity(cpu, mem)[source]

Bases: scannerpy.kube.MachineType

get_cpu()[source]
get_mem()[source]
get_name()[source]
scannerpy.kube.master()[source]
scannerpy.kube.run(s, detach=False)[source]
scannerpy.kube.worker()[source]