Scanner Python API

scannerpy.database module

class scannerpy.database.Database(master=None, workers=None, start_cluster=True, config_path=None, config=None, debug=None, enable_watchdog=True, prefetch_table_metadata=True, no_workers_timeout=30, grpc_timeout=30, new_job_retries_limit=5, machine_params=None)[source]

Bases: object

Entrypoint for all Scanner operations.

Parameters:
  • master (Optional[str]) – The address of the master process. The addresses should be formatted as ‘ip:port’. If the start_cluster flag is specified, the Database object will ssh into the provided address and start a master process. You should have ssh access to the target machine and scannerpy should be installed.
  • workers (Optional[List[str]]) – The list of addresses to spawn worker processes on. The addresses should be formatted as ‘ip:port’. Like with master, you should have ssh access to the target machine and scannerpy should be installed. If start_cluster is false, this parameter has no effect.
  • start_cluster (bool) – If true, a master process and worker processes will be spawned at the addresses specified by master and workers, respectively.
  • config_path (Optional[str]) – Path to a Scanner configuration TOML, by default assumed to be ‘~/.scanner/config.toml’.
  • config (Optional[Config]) – The scanner Config to use. If specified, config_path is ignored.
  • debug (Optional[bool]) – This flag is only relevant when start_cluster == True. If true, the master and worker servers are spawned in the same process as the invoking python code, enabling easy gdb-based debugging.
Other Parameters:
 
  • prefetch_table_metadata
  • no_workers_timeout
  • grpc_timeout
Variables:
  • config (Config) – The Config object used to initialize this Database.
  • ops (OpGenerator) –

    Represents the set of available Ops. Ops can be created like so:

    output = db.ops.ExampleOp(arg=’example’)

    For a more detailed description, see OpGenerator

  • sources (SourceGenerator) – Represents the set of available Sources. Sources are created just like Ops. See SourceGenerator
  • sinks (SinkGenerator) – Represents the set of available Sinks. Sinks are created just like Ops. See SinkGenerator
  • streams (StreamsGenerator) – Used to specify which elements to sample from a sequence. See StreamsGenerator
  • partitioner (TaskPartitioner) – Used to specify how to split the elements in a sequence when performing a slice operation. See TaskPartitioner.
  • protobufs (ProtobufGenerator) – Used to construct protobuf objects that handle serialization/deserialization of the outputs of Ops.
bulk_fetch_video_metadata(tables)[source]
delete_table(name)[source]

Deletes a table from the database.

Parameters:name (str) – The name of the table to delete.
delete_tables(names)[source]

Deletes tables from the database.

Parameters:names (List[str]) – The names of the tables to delete.
get_active_jobs()[source]
has_gpu()[source]
has_table(name)[source]

Checks if a table exists in the database.

Parameters:name (str) – The name of the table to check for.
Returns:True if the table exists, false otherwise.
Return type:bool
ingest_videos(videos, inplace=False, force=False)[source]

Creates tables from videos.

Parameters:
  • videos (List[Tuple[str, str]]) – The list of videos to ingest into the database. Each element in the list should be (‘table_name’, ‘path/to/video’).
  • inplace (bool) – If true, ingests the videos without copying them into the database. Currently only supported for mp4 containers.
  • force (bool) – If true, deletes existing tables with the same names.
Return type:

Tuple[List[Table], List[Tuple[str, str]]]

Returns:

  • tables (List[Table]) – List of table objects for the ingested videos.
  • failures (List[Tuple[str, str]]) – List of (‘path/to/video’, ‘reason for failure’) tuples for each video which failed to ingest.

load_op(so_path, proto_path=None)[source]

Loads a custom op into the Scanner runtime.

Parameters:
  • so_path (str) – Path to the custom op’s shared library (.so).
  • proto_path (Optional[str]) – Path to the custom op’s arguments protobuf if one exists.
Raises:

ScannerException – Raised when the master fails to load the op.

new_table(name, columns, rows, fns=None, force=False)[source]

Creates a new table from a list of rows.

Parameters:
  • name (str) – String name of the table to create
  • columns (List[str]) – List of names of table columns
  • rows (List[List[bytes]]) – List of rows with each row a list of elements corresponding to the specified columns. Elements must be strings of serialized representations of the data.
  • fns
  • force (bool) –
Returns:

The new table object.

Return type:

Table

profiler(job_name)[source]
register_op(name, input_columns, output_columns, variadic_inputs=False, stencil=None, unbounded_state=False, bounded_state=None, proto_path=None)[source]

Register a new Op with the Scanner master.

Parameters:
  • name (str) – Name of the Op.
  • input_columns (List[Union[str, Tuple[str, ColumnType]]]) – A list of the inputs for this Op. Can be either the name of the input as a string or a tuple of (‘name’, ColumnType). If only the name is specified as a string, the ColumnType is assumed to be ColumnType.Blob.
  • output_columns (List[Union[str, Tuple[str, ColumnType]]]) – A list of the outputs for this Op. Can be either the name of the output as a string or a tuple of (‘name’, ColumnType). If only the name is specified as a string, the ColumnType is assumed to be ColumnType.Blob.
  • variadic_inputs (bool) – If true, this Op may take a variable number of inputs and input_columns is ignored. Variadic inputs are specified as positional arguments when invoking the Op, instead of keyword arguments.
  • stencil (Optional[List[int]]) – Specifies the default stencil to use for the Op. If none, indicates that the the Op does not have the ability to stencil. A stencil of [0] should be specified if the Op can stencil but should not by default.
  • unbounded_state (bool) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.
  • bounded_state (Optional[int]) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.
  • proto_path (Optional[str]) – Optional path to the proto file that describes the configuration arguments to this Op.
Raises:

ScannerException – Raised when the master fails to register the Op.

register_python_kernel(op_name, device_type, kernel, batch=1)[source]

Register a Python Kernel with the Scanner master.

Parameters:
  • op_name (str) – Name of the Op.
  • device_type (DeviceType) – The device type of the resource this kernel uses.
  • kernel (Union[function, builtin_function_or_method, Kernel]) – The class or function that implements the kernel.
  • batch (int) – Specifies a default for how many elements this kernel should batch over. If batch == 1, kernel is assume to not be able to batch.
Raises:

ScannerException – Raised when the master fails to register the kernel.

run(output, jobs, force=False, work_packet_size=32, io_packet_size=128, cpu_pool=None, gpu_pool=None, pipeline_instances_per_node=None, show_progress=True, profiling=False, load_sparsity_threshold=8, tasks_in_queue_per_pu=4, task_timeout=0, checkpoint_frequency=1000, detach=False)[source]

Runs a collection of jobs.

Parameters:
  • output (Sink) – The Sink that should be processed.
  • jobs (Sequence[Job]) – The set of jobs to process. All of these jobs should share the same graph of operations.
  • force (bool) – If the output tables already exist, overwrite them.
  • work_packet_size (int) – The size of the packets of intermediate elements to pass between operations. This parameter only affects performance and should not affect the output.
  • io_packet_size (int) – The size of the packets of elements to read and write from Sources and sinks. This parameter only affects performance and should not affect the output. When reading and writing to high latency storage (such as the cloud), it is helpful to increase this value.
  • cpu_pool (Optional[str]) –
  • gpu_pool (Optional[str]) –
  • pipeline_instances_per_node (Optional[int]) – The number of concurrent instances of the computation graph to execute. If set to None, it will be automatically inferred based on computation graph and the available machine resources.
  • show_progress (bool) – If true, will display an ASCII progress bar measuring job status.
  • profiling (bool) –
Other Parameters:
 
  • load_sparsity_threshold
  • tasks_in_queue_per_pu
  • task_timeout
  • checkpoint_frequency
Returns:

The new table objects if output is a db.sinks.Column, otherwise an empty list.

Return type:

List[Table]

start_master(master)[source]

Starts a Scanner master.

Parameters:master (str) – ssh-able address of the master node.
start_workers(workers)[source]

Starts Scanner workers.

Parameters:workers (List[str]) – list of ssh-able addresses of the worker nodes.
stop_cluster()[source]

Stops the Scanner master and workers.

summarize()[source]

Returns a human-readable summarization of the database state.

Return type:str
table(name)[source]

Retrieves a Table.

Parameters:name (str) – Name of the table to retrieve.
Returns:The table object.
Return type:Table
wait_on_job(*args, **kwargs)[source]
wait_on_job_gen(bulk_job_id, show_progress=True)[source]
scannerpy.database.start_master(port=None, config=None, config_path=None, block=False, watchdog=True, no_workers_timeout=30, new_job_retries_limit=5)[source]

Start a master server instance on this node.

Parameters:
  • port (Optional[int]) – The port number to start the master on. If unspecified, it will be read from the provided Config.
  • config (Optional[Config]) – The scanner Config to use. If specified, config_path is ignored.
  • config_path (optional) – Path to a Scanner configuration TOML, by default assumed to be ~/.scanner/config.toml.
  • block (optional) – If true, will wait until the server is shutdown. Server will not shutdown currently unless wait_for_server_shutdown is eventually called.
  • watchdog (optional) – If true, the master will shutdown after a time interval if PokeWatchdog is not called.
  • no_workers_timeout (optional) – The interval after which the master will consider a job to have failed if it has no workers connected to it.
Returns:

A cpp database instance.

Return type:

Database

scannerpy.database.start_worker(master_address, machine_params=None, port=None, config=None, config_path=None, block=False, watchdog=True, num_workers=None, db=None)[source]

Starts a worker instance on this node.

Parameters:
  • master_address (str) – The address of the master server to connect this worker to. The expected format is ‘0.0.0.0:5000’ (ip:port).
  • machine_params – Describes the resources of the machine that the worker should manage. If left unspecified, the machine resources will be inferred.
  • config (Optional[Config]) – The Config object to use in creating the worker. If specified, config_path is ignored.
  • config_path (Optional[str]) – Path to a Scanner configuration TOML, by default assumed to be ~/.scanner/config.toml.
  • block (bool) – If true, will wait until the server is shutdown. Server will not shutdown currently unless wait_for_server_shutdown is eventually called.
  • watchdog (bool) – If true, the worker will shutdown after a time interval if PokeWatchdog is not called.
Other Parameters:
 
  • num_workers – Specifies the number of workers to create. If unspecified, only one is created. This is a legacy feature that exists to deal with kernels that can not be executed in the same process due to shared global state. By spawning multiple worker processes and using a single pipeline per worker, this limitation can be avoided.
  • db – This is for internal usage only.
Returns:

A cpp database instance.

Return type:

Database

scannerpy.database.worker_process(args)[source]

scannerpy.config module

class scannerpy.config.Config(config_path=None, db_path=None)[source]

Bases: object

static default_config()[source]
static default_config_path()[source]
scannerpy.config.mkdir_p(path)[source]
scannerpy.config.read_line(s)[source]

scannerpy.op module

class scannerpy.op.Op(db, name, inputs, device, batch=-1, warmup=-1, stencil=[0], args={}, extra=None)[source]

Bases: object

inputs()[source]
outputs()[source]
to_proto(indices)[source]
class scannerpy.op.OpColumn(db, op, col, typ)[source]

Bases: object

compress(codec='video', **kwargs)[source]
compress_default()[source]
compress_video(quality=-1, bitrate=-1, keyframe_distance=-1)[source]
lossless()[source]
class scannerpy.op.OpGenerator(db)[source]

Bases: object

Creates Op instances to define a computation.

When a particular op is requested from the generator, e.g. db.ops.Histogram, the generator does a dynamic lookup for the op in a C++ registry.

scannerpy.op.register_python_op(name=None, stencil=None, unbounded_state=False, bounded_state=None, device_type=None, device_sets=None, batch=1, proto_path=None)[source]

Class or function decorator which registers a new Op and Kernel with the Scanner master.

Parameters:
  • name (Optional[str]) – Optional name for the Op. By default, it will be inferred as the name of the decorated class/kernel.
  • stencil (Optional[List[int]]) – Specifies the default stencil to use for the Op. If none, indicates that the the Op does not have the ability to stencil. A stencil of [0] should be specified if the Op can stencil but should not by default.
  • unbounded_state (bool) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.
  • bounded_state (Optional[int]) – If true, indicates that the Op needs to see all previous elements of its input sequences before it can compute a given element. For example, to compute output element at index 100, the Op must have already produced elements 0-99. This option is mutually exclusive with bounded_state.
  • device_type (Optional[DeviceType]) –
  • device_sets (Optional[List[Tuple[DeviceType, int]]]) –
  • batch (int) –
  • proto_path (Optional[str]) – Optional path to the proto file that describes the configuration arguments to this Op.

scannerpy.streams module

class scannerpy.streams.StreamsGenerator(db)[source]

Bases: object

Provides Ops for sampling elements from streams.

The methods of this class construct Scanner Ops that enable selecting subsets of the elements in a stream to produce new streams.

This class should not be constructed directly, but accessed via a Database object like:

db.streams.Range(input)
All(input)[source]

Samples all elements from the stream.

Serves as an identity sampling function.

Parameters:input (OpColumn) – The stream to sample.
Returns:The sampled stream.
Return type:scannerpy.op.OpColumn
Gather(input, rows=None)[source]

Samples a list of elements from the input stream.

Parameters:
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

Range(input, start=None, end=None)[source]

Samples a range of elements from the input stream.

Parameters:
  • input (OpColumn) – The stream to sample.
  • start (Optional[int]) – The default index to start sampling from.
  • end (Optional[int]) – The default index to end sampling at.
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

Ranges(input, intervals=None)[source]

Samples multiple ranges of elements from the input stream.

Parameters:
  • input (OpColumn) – The stream to sample.
  • intervals (Optional[Sequence[Tuple[int, int]]]) – The default intervals to sample from. This should be a list of tuples representing start and end ranges.
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

Examples

For example, to select frames 0-10 and 100-200, you would write:

db.streams.Ranges(input=input, intervals=[(0, 11), (100, 201)])

Repeat(input, spacing=None)[source]

Expands a sequence by repeating elements.

Parameters:
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

RepeatNull(input, spacing=None)[source]

Expands a sequence by inserting nulls.

Parameters:
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

Slice(input, partitioner=None)[source]

Partitions a stream into independent substreams.

Parameters:
  • input (OpColumn) – The stream to partition.
  • partitioner – The partitioner that should be used to split the stream into substreams.
Returns:

A new stream which represents multiple substreams.

Return type:

scannerpy.op.OpColumn

Stride(input, stride=None)[source]

Samples every n’th element from the stream, where n is the stride.

Parameters:
  • input (OpColumn) – The stream to sample.
  • stride (Optional[int]) – The default value to stride by for all jobs.
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

StridedRange(input, start=None, end=None, stride=None)[source]

Samples a strided range of elements from the input stream.

Parameters:
  • input (OpColumn) – The stream to sample.
  • start (Optional[int]) – The default index to start sampling from.
  • end (Optional[int]) – The default index to end sampling at.
  • stride (Optional[int]) – The default value to stride by.
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

StridedRanges(input, intervals=None, stride=None)[source]

Samples strided ranges of elements from the input stream.

Parameters:
  • input (OpColumn) – The stream to sample.
  • intervals (Optional[Sequence[Tuple[int, int]]]) – The default intervals to sample from. This should be a list of tuples representing start and end ranges.
  • stride (Optional[int]) – The default value to stride by.
Returns:

The sampled stream.

Return type:

scannerpy.op.OpColumn

Unslice(input)[source]

Joins substreams back together.

Parameters:input (OpColumn) – The stream which contains substreams to join back together.
Returns:A new stream which is the concatentation of the input substreams.
Return type:scannerpy.op.OpColumn

scannerpy.source module

class scannerpy.source.Source(db, name, source_args={})[source]

Bases: object

outputs()[source]
to_proto(indices)[source]
class scannerpy.source.SourceGenerator(db)[source]

Bases: object

Creates Source instances to define a computation.

When a particular Source is requested from the generator, e.g. db.source.Column, the generator does a dynamic lookup for the Source in the servers registry.

scannerpy.sink module

class scannerpy.sink.Sink(db, name, inputs, sink_args={})[source]

Bases: object

inputs()[source]
to_proto(indices)[source]
class scannerpy.sink.SinkGenerator(db)[source]

Bases: object

Creates Sink instances to define a computation.

When a particular Sink is requested from the generator, e.g. db.sink.Column, the generator does a dynamic lookup for the Sink in the servers registry.

scannerpy.job module

class scannerpy.job.Job(op_args)[source]

Bases: object

A specification of a table to produce as output of a bulk job.

op_args()[source]

scannerpy.column module

class scannerpy.column.Column(table, name)[source]

Bases: object

A column of a Table.

id()[source]
keyframes()[source]
load(fn=None, rows=None)[source]

Loads the results of a Scanner computation into Python.

Kwargs:
fn: Optional function to apply to the binary blobs as they are read
in.
Returns:Generator that yields either a numpy array for frame columns or a binary blob for non-frame columns (optionally processed by the fn).
name()[source]
save_mp4(output_name, fps=None, scale=None)[source]
type()[source]

scannerpy.common module

class scannerpy.common.ColumnType[source]

Bases: enum.Enum

Enum for specifying what the type of a column is.

Blob = 0
Video = 1
to_proto = <function ColumnType.to_proto>[source]
class scannerpy.common.DeviceHandle(device, device_id)[source]

Bases: object

class scannerpy.common.DeviceType[source]

Bases: enum.Enum

Enum for specifying where an Op should run.

CPU = 0
GPU = 1
to_proto = <function DeviceType.to_proto>[source]
class scannerpy.common.FrameType[source]

Bases: object

exception scannerpy.common.ScannerException[source]

Bases: Exception

scannerpy.kernel module

class scannerpy.kernel.Kernel(config)[source]

Bases: object

close()[source]
execute(input_columns)[source]
new_stream(args)[source]
reset()[source]
class scannerpy.kernel.KernelConfig(config)[source]

Bases: object

scannerpy.partitioner module

class scannerpy.partitioner.TaskPartitioner(db)[source]

Bases: object

Utility for specifying how to partition the output domain of a job into tasks.

all(group_size=250)[source]
gather(groups)[source]
range(start, end)[source]
ranges(intervals)[source]
strided(stride, group_size=250)[source]
strided_range(start, end, stride)[source]
strided_ranges(intervals, stride)[source]

scannerpy.profiler module

class scannerpy.profiler.Profiler(db, job_id)[source]

Bases: object

Contains profiling information about Scanner jobs.

statistics()[source]
total_time_interval()[source]
write_trace(path)[source]

Generates a trace file in Chrome format.

To visualize the trace, visit chrome://tracing in Google Chrome and click “Load” in the top left to load the trace.

Parameters:path (str) – Output path to write the trace.
scannerpy.profiler.read_advance(fmt, buf, offset)[source]
scannerpy.profiler.unpack_string(buf, offset)[source]

scannerpy.protobuf_generator module

class scannerpy.protobuf_generator.ProtobufGenerator(cfg)[source]

Bases: object

add_module(path)[source]
scannerpy.protobuf_generator.python_to_proto(protos, proto_name, obj)[source]

scannerpy.sampler module

class scannerpy.sampler.Sampler(db)[source]

Bases: object

Utility for specifying which frames of a video (or which rows of a table) to run a computation over.

All(input)[source]
Gather(input, rows=None)[source]
Range(input, start=None, end=None)[source]
Ranges(input, intervals=None)[source]
Repeat(input, spacing=None)[source]
RepeatNull(input, spacing=None)[source]
Stride(input, stride=None)[source]
StridedRange(input, start=None, end=None, stride=None)[source]
StridedRanges(input, intervals=None, stride=None)[source]

scannerpy.table module

class scannerpy.table.Table(db, name, id)[source]

Bases: object

A table in a Database.

Can be part of many Collection objects.

column(name)[source]
column_names()[source]
committed()[source]
id()[source]
load(columns, fn=None, rows=None)[source]
name()[source]
num_rows()[source]
parent_rows()[source]
profiler()[source]

Module contents