Core objects

Parameters

class concert.base.Parameter(fget=None, fset=None, fget_target=None, data=None, check=None, help=None)

A parameter with getter and setter.

Parameters are similar to normal Python properties and can additionally trigger state checks. If fget or fset is not given, you must implement the accessor functions named _set_name and _get_name:

from concert.base import Parameter, State, check

class SomeClass(object):

    state = State(default='standby')
    param = Parameter(check=check(source='standby', target=['standby', 'moving']))

    def _set_param(self, value):
        pass

    def _get_param(self):
        pass

When a Parameter is attached to a class, you can modify it by accessing its associated ParameterValue with a dictionary access:

obj = SomeClass()
print(obj['param'])

fget is a callable that is called when reading the parameter. fset is called when the parameter is written to. fget_target is a getter for the target value. fget, fset, fget_target must be member functions of the corresponding Parameterizable object.

data is passed to the state check function.

check is a check() that changes states when a value is written to the parameter.

help is a string describing the parameter in more detail.

class concert.base.ParameterValue(instance, parameter)

Value object of a Parameter.

await get(wait_on=None)

Get coroutine obtaining the concrete value of this object.

If wait_on is not None, it must be an awaitable on which this method waits.

await get_target(wait_on=None)

Get coroutine obtaining target value of this object.

If wait_on is not None, it must be an awaitable on which this method waits.

lock(permanent=False)

Lock parameter for writing. If permament is True the parameter cannot be unlocked anymore.

property locked

Return True if the parameter is locked for writing.

await restore()

Restore the last value saved with ParameterValue.stash().

If the parameter can only be read or no value has been saved, this operation does nothing.

await set(value, wait_on=None)

Set concrete value on the object.

If wait_on is not None, it must be an awaitable on which this method waits.

await stash()

Save the current value internally on a growing stack.

If the parameter is writable the current value is saved on a stack and to be later retrieved with ParameterValue.restore().

unlock()

Unlock parameter for writing.

await wait(value, sleep_time=<Quantity(0.1, 'second')>, timeout=None)

Wait until the parameter value is value. sleep_time is the time to sleep between consecutive checks. timeout specifies the maximum waiting time.

property writable

Return True if the parameter is writable.

class concert.base.Quantity(unit, fget=None, fset=None, fget_target=None, lower=None, upper=None, data=None, check=None, external_lower_getter=None, external_upper_getter=None, user_lower_getter=None, user_lower_setter=None, user_upper_getter=None, user_upper_setter=None, help=None)

Bases: Parameter

A Parameter associated with a unit.

fget, fset, data, check and help are identical to the Parameter constructor arguments.

unit is a Pint quantity. lower and upper denote soft limits between the Quantity values can lie.

class concert.base.QuantityValue(instance, quantity)

Bases: ParameterValue

await get(wait_on=None)

Get coroutine obtaining the concrete value of this object.

If wait_on is not None, it must be an awaitable on which this method waits.

lock_limits(permanent=False)

Lock limits, if permanent is True the limits cannot be unlocked anymore.

await set(value, wait_on=None)

Set concrete value on the object.

If wait_on is not None, it must be an awaitable on which this method waits.

unlock_limits()

Unlock limits.

await wait(value, eps=None, sleep_time=<Quantity(0.1, 'second')>, timeout=None)

Wait until the parameter value is value. eps is the allowed discrepancy between the actual value and value. sleep_time is the time to sleep between consecutive checks. timeout specifies the maximum waiting time.

Collection of parameters

class concert.base.Parameterizable(self)

Collection of parameters.

For each class of type Parameterizable, Parameter can be set as class attributes

class Device(Parameterizable):

    def get_something(self):
        return 'something'

    something = Parameter(get_something)

There is a simple Parameter and a parameter which models a physical quantity Quantity.

A Parameterizable is iterable and returns its parameters of type ParameterValue or its subclasses

for param in device:
    print("name={}".format(param.name))

To access a single name parameter object, you can use the [] operator:

param = device['position']

If the parameter name does not exist, a ParameterError is raised.

Each parameter value is accessible as a property. If a device has a position it can be read and written with:

param.position = 0 * q.mm
print param.position
install_parameters(params)

Install parameters at run-time.

params is a dictionary mapping parameter names to Parameter objects.

lock(permanent=False)

Lock all the parameters for writing. If permanent is True, the parameters cannot be unlocked anymore.

await restore()

Restore all parameters saved with Parameterizable.stash().

await stash()

Save all writable parameters that can be restored with Parameterizable.restore().

The values are stored on a stacked, hence subsequent saved states can be restored one by one.

unlock()

Unlock all the parameters for writing.

class concert.base.RunnableParameterizable(self)

State machine

class concert.base.State(default=None, fget=None, fset=None, data=None, check=None, help=None)

Finite state machine.

Use this on a class, to keep some sort of known state. In order to enforce restrictions, you would decorate methods on the class with check():

class SomeObject(object):

    state = State(default='standby')

    @check(source='*', target='moving')
    def move(self):
        pass

In case your device doesn’t provide information on its state you can use the transition() to store the state in an instance of your device:

@transition(immediate='moving', target='standby')
def _set_some_param(self, param_value):
    # when the method starts device state is set to *immediate*
    # long operation goes here
    pass
    # the state is set to *target* in the end

Accessing the state variable will return the current state value, i.e.:

obj = SomeObject()
assert obj.state == 'standby'

The state cannot be set explicitly by:

obj.state = 'some_state'

but the object needs to provide methods which transition out of states, the same holds for transitioning out of error states. If the _get_state() method is implemented in the device it is always used to get the state, otherwise the state is stored in software.

fget is a callable that is called when reading the parameter. fset is called when the parameter is written to. fget_target is a getter for the target value. fget, fset, fget_target must be member functions of the corresponding Parameterizable object.

data is passed to the state check function.

check is a check() that changes states when a value is written to the parameter.

help is a string describing the parameter in more detail.

concert.base.check(source: Union[str, List[str]] = '*', target: Union[str, List[str]] = '*', state_name='state')

Decorates a method for checking the device state.

source denotes the source state that must be present at the time of invoking the decorated method. target is the state that the state object will be after successful completion of the method or a list of possible target states. state_name can be used to define the state if multiple states are used.

concert.base.transition(immediate=None, target=None, state_name='state')

Change software state of a device to immediate. After the function execution finishes change the state to target. On asyncio.CancelledError, state is set to target and cleanup logic must take place in the callable to be wrapped. state_name can be used to define the state if multiple states are used.

Devices

class concert.devices.base.Device(self)

Bases: Parameterizable

A Device provides locked access to a real-world device.

It implements the context protocol to provide locking:

async with device:
    # device is locked
    await device.set_parameter(1 * q.m)
    ...

# device is unlocked again
await emergency_stop()

Emergency stop.

Asynchronous execution

exception concert.coroutines.base.WaitError

Raised on busy waiting timeouts

concert.coroutines.base.background(corofunc)

Same as start(), just meant to be used as a decorator.

concert.coroutines.base.broadcast(producer, *consumers)

Feed producer to all consumers.

await concert.coroutines.base.ensure_coroutine(func, *args, **kwargs)

func(*args, **kwargs) returns an awaitable which is wrapped here into a real coroutine. This is useful for turuning futures from other libraries, like Tango, into real coroutines.

await concert.coroutines.base.feed_queue(producer, func, *args)

Feed function func with items from producer in a separete thread. The signatute must be func(queue, *args) where elements in the queue are instances of concert.helpers.PrioItem.

concert.coroutines.base.get_event_loop()

Get asyncio’s event loop.

await concert.coroutines.base.run_in_executor(func, *args)

Run a blocking function func with signature func(*args) in an executor. If run_in_executor() is cancelled, it blocks until func finishes so that all resources are in consistent state.

concert.coroutines.base.run_in_loop(coroutine, error_msg_if_running=None)

Wrap coroutine into a asyncio.Task, run it in the current loop, block until it finishes and return the result. On KeyboardInterrupt, the task is cancelled. Raise RuntimeError with message error_msg_if_running in case the loop is already running, otherwise Python will take care of the error reporting.

concert.coroutines.base.start(awaitable)

Wrap awaitable into a task and start its execution right away. The returned task will also be cancellable by ctrl-k.

await concert.coroutines.base.wait_until(condition, sleep_time=<Quantity(0.1, 'second')>, timeout=None)

Wait until a callable condition returns True. sleep_time is the time to sleep between consecutive checks of condition. If timeout is given and the condition doesn’t return True within the time specified by it a WaitingError is raised.

Exceptions

class concert.base.UnitError

Raised when an operation is passed value with an incompatible unit.

class concert.base.LimitError

Raised when an operation is passed a value that exceeds a limit.

class concert.base.ParameterError(parameter)

Raised when a parameter is accessed that does not exists.

class concert.base.AccessorNotImplementedError

Raised when a setter or getter is not implemented.

class concert.base.ReadAccessError(parameter)

Raised when user tries to read a parameter that cannot be read.

class concert.base.WriteAccessError(parameter)

Raised when user tries to read a parameter that cannot be read.

class concert.base.StateError(error_state, msg=None)

Raised in state check functions of devices.

Configuration

concert.config.MOTOR_VELOCITY_SAMPLING_TIME

Time step for calculation of motor velocity by measuring two postion values. Longer values will create more acurate results but reading the velocity will take more time.

concert.config.PROGRESS_BAR

Turn on progress bar by long-lasting operations if tqdm package is present

Sessions

class concert.session.utils.SubCommand(name, opts)

Base sub-command class (concert [subcommand]).

SubCommand objects are loaded at run-time and injected into Concert’s command parser.

name denotes the name of the sub-command parser, e.g. “mv” for the MoveCommand. opts must be an argparse-compatible dictionary of command options.

abstractmethod run(*args, **kwargs)

Run the command

concert.session.utils.abort_awaiting(skip=None)

Cancel background tasks. skip are coroutine names which are not cancelled.

await concert.session.utils.check_emergency_stop(check, poll_interval=0.1 * q.s, exit_session=False)

If a callable check returns True abort is called. Then until it clears to False nothing is done and then the process begins again. poll_interval is the interval at which check is called. If exit_session is True the session exits when the emergency stop occurs.

concert.session.utils.code_of(func)

Show implementation of func.

concert.session.utils.ddoc()

Render device documentation.

concert.session.utils.dstate()

Render device state in a table.

concert.session.utils.get_default_table(field_names, widths=None)

Return a prettytable styled for use in the shell. field_names is a list of table header strings.

concert.session.utils.pdoc(hide_blacklisted=True)

Render process documentation.

Networking

Networking package facilitates all network connections, e.g. sockets and Tango.

Socket Connections

class concert.networking.base.SocketConnection(host, port, return_sequence='\n')

A two-way socket connection. return_sequence is a string appended after every command indicating the end of it, the default value is a newline (n).

await close()

Close connection.

await connect()

Open connection.

await execute(data, num=1024)

Execute command and wait for response (coroutine-safe, not thread-safe). Read num bytes from the socket.

await recv(num=1024)

Read num bytes from the socket. The result is first stripped from the trailing return sequence characters and then returned.

await send(data)

Send data to the peer. The return sequence characters are appended to the data before it is sent.

TANGO

Tango devices are interfaced by PyTango, one can obtain the DeviceProxy by the get_tango_device() function.

concert.networking.base.get_tango_device(uri, peer=None, timeout=<Quantity(10, 'second')>)

Get a Tango device by specifying its uri. If peer is given change the tango_host specifying which database to connect to. Format is host:port as a string. timeout sets the device’s general timeout. It is converted to milliseconds, converted to integer and then the tango device’s set_timout_millis is called with the converted integer value.

Helpers

class concert.helpers.Bunch(values)

Encapsulate a list or dictionary to provide attribute-like access.

Common use cases look like this:

d = {'foo': 123, 'bar': 'baz'}
b = Bunch(d)
print(b.foo)
>>> 123

l = ['foo', 'bar']
b = Bunch(l)
print(b.foo)
>>> 'foo'
class concert.helpers.CommData(host: str, port: int = None, protocol: str = 'tcp', socket_type: SocketType = SocketType.PUSH, sndhwm: int = 1000)

Encapsulates communication metadata.

class concert.helpers.ImageWithMetadata(input_array: ndarray, metadata: dict | None = None)

Subclass of numpy.ndarray with a metadata dictionary to hold images its metadata.

Parameters
  • input_array – Input array (can be an instance of ImageWithMetadata)

  • metadata – metadata

class concert.helpers.PerformanceTracker(summary=False, loglevel=8)

A stopwatch with the ability to report data throughput.

Parameters

summary – if True, output everything at the end, otherwise immediately on lap()

call. :param loglevel: logging level

class concert.helpers.PrioItem(priority: int, data: Any)

To be used in combination with queue.PriorityQueue.

concert.helpers.arange(start, stop, step)

This function wraps numpy.arange but strips the units before and adds the unit later at the numpy.array.

Parameters
  • start (concert.quantities.q.Quantity) –

  • stop (concert.quantities.q.Quantity) –

  • step (concert.quantities.q.Quantity) –

Returns

class concert.helpers.expects(*args, **kwargs)

Decorator which determines expected arguments for the function and also check correctness of given arguments. If input arguments differ from expected ones, exception TypeError will be raised.

For numeric arguments use Numeric class with 2 parameters: dimension of the array and units (optional). E.g. “Numeric (1)” means function expects one number or “Numeric (2, q.mm)” means function expects expression like [4,5]*q.mm

Common use case looks like this:

from concert.helpers import Numeric

@expects(Camera, LinearMotor, pixelsize = Numeric(2, q.mm))
def foo(camera, motor, pixelsize = None):
    pass
concert.helpers.is_iterable(item)

Is item iterable or not.

concert.helpers.linspace(start, stop, num, endpoint=True)

This function wraps numpy.linspace but strips the units before and adds the unit later at the numpy.array.

Parameters
  • start (concert.quantities.q.Quantity) – First value

  • stop (concert.quantities.q.Quantity) –

  • num (int) –

  • endpoint (bool) –

Returns

numpy.array with the length num and entries equally distributed within start and stop.

concert.helpers.measure(func=None, return_result=False)

Measure and print execution time of func.

If return_result is True, the decorated function returns a tuple consisting of the original return value and the measured time in seconds.

concert.helpers.memoize(func)

Memoize the result of func.

Remember the result of func depending on its arguments. Note, that this requires that the function is free from any side effects, e.g. returns the same value given the same arguments.

Storage

storage.py

Storage implementations.

class concert.storage.DirectoryWalker(self, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', writer: Type[TiffWriter] = <class 'concert.writers.TiffWriter'>, start_index: int = 0, bytes_per_file: int = 0, rights: str = '750')

A DirectoryWalker moves through a file system and writes flat files using a specific filename template.

await __ainit__(root: ~typing.Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', writer: ~typing.Type[~concert.writers.TiffWriter] = <class 'concert.writers.TiffWriter'>, start_index: int = 0, bytes_per_file: int = 0, rights: str = '750') None

Use writer to write data to files with filenames with a template from dsetname. start_index specifies the number in the first file name, e.g. for the default dsetname and start_index 100, the first file name will be frame_000100.tif.

await exists(*paths: str) bool

Check if paths exist.

await log_to_json(payload: str, filename: str = 'experiment.json') None

Logs experiment metadata as payload to a file called experiment.json

NOTE: This method does not have to be a coroutine. We still made it so to maintain coherence at the api level. With the unification of the top layer walker api this concern would be addressed.

Parameters

payload (str) – content to write

await register_logger(logger_name: str, log_level: int, file_name: str) AsyncLoggingHandlerCloser

Registers a logger with walker device server and provides a remote logging handler

class concert.storage.DummyWalker(self, root: str = '')

Walker object used for testing purposes

await __ainit__(root: str = '') None

Constructor. root is the topmost level of the data structure

Parameters
  • root (str) – topmost level of the data structure

  • dsetname (str) – dataset name

await exists(*paths) bool

Return True if path from current position specified by a list of paths exists.

await log_to_json(payload: str, filename: str = 'experiment.json') None

Provides local counterpart of the remote logging of experiment metadata. Writes the provided payload to a static file called experiment.json.

Parameters

payload (str) – content to write

await register_logger(logger_name: str, log_level: int, file_name: str) AsyncLoggingHandlerCloser

Provides a no-op logging handler as a placeholder

class concert.storage.RemoteDirectoryWalker(self, device: RemoteDirectoryWalkerTangoDevice, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', wrt_cls: str = 'TiffWriter', start_index: int = 0, bytes_per_file: int = 0)

Defines the api layer of a directory walker for a remote file system. Encapsulates a Tango device which runs on the remote file system where the data needs to be written.

await __ainit__(device: RemoteDirectoryWalkerTangoDevice, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', wrt_cls: str = 'TiffWriter', start_index: int = 0, bytes_per_file: int = 0) None

Initializes a remote directory walker. This walker implementation encapsulates a Tango device server and delegates its core utilities to the same. :param device: an abstract tango device conforming to remote tango walker specification :type device: RemoteDirectoryWalkerTangoDevice :param root: file system root for to start traversal, if None current directory of the walker is used :type root: Optional[str] :param dsetname: template for writing files of the dataset :type dsetname: str :param wrt_cls: specific writer class which the device should use to write files :type wrt_cls: str :param start_index: number of the first file name in the dataset :type start_index: int :param bytes_per_file: size limit for a file, 0 denotes 1 file per image :type bytes_per_file: int

await exists(*paths: str) bool

Asserts whether the specified paths exists in the file system.

Parameters

paths (str) – a given number of file system paths

Returns

asserts whether specified path exists

Return type

bool

await home() None

Return to root remotely and inside its own context (which is implemented in the super class)

await log_to_json(payload: str, filename: str = 'experiment.json') None

Implements api layer for writing experiment metadata

await register_logger(logger_name: str, log_level: int, file_name: str) AsyncLoggingHandlerCloser

Provides a logging handler for the current path, capable to facilitate logging at a remote host.

await write_sequence(name: Optional[str] = '') None

Asynchronously writes a sequence in the provided path

Parameters

path (str) – path to write to

exception concert.storage.StorageError

Exception related to logical issues with storage

class concert.storage.Walker(self, root: str, dsetname: str = 'frames')

A Walker moves through an abstract hierarchy and allows to write data at a specific location.

await __ainit__(root: str, dsetname: str = 'frames') None

Constructor. root is the topmost level of the data structure

Parameters
  • root (str) – topmost level of the data structure

  • dsetname (str) – dataset name

await ascend() Walker

Ascend from current depth and return self.

await create_writer(producer: AsyncIterable[ArrayLike], name: Optional[str] = None, dsetname: Optional[str] = None) Awaitable

Create a writer coroutine for writing data set dsetname with images from producer inside. If name is given, descend to it first and once the writer is created ascend back. This way, the writer can operate in name and the walker can be safely used to move around and create other writers elsewhere while the created writer is working. The returned coroutine is not guaranteed to be wrapped into a asyncio.Task, hence to be started immediately. This function also does not block after creating the writer. This is useful for splitting the preparation of writing (creating directories, …) and the I/O itself.

await descend(name: str) Walker

Descend to name and return self.

abstractmethod await exists(*paths) bool

Return True if path from current position specified by a list of paths exists.

await home() None

Return to root

abstractmethod await log_to_json(payload: str, filename: str = 'experiment.json') None

Provides local counterpart of the remote logging of experiment metadata. Writes the provided payload to a static file called experiment.json.

Parameters

payload (str) – content to write

abstractmethod await register_logger(logger_name: str, log_level: int, file_name: str) AsyncLoggingHandlerCloser

Registers a logger with walker device server and provides a remote logging handler

await write(producer: AsyncIterable[ArrayLike], dsetname: Optional[str] = None) Awaitable

Create a coroutine for writing data set dsetname with images from producer. The execution starts immediately in the background and await will block until the images are written.

concert.storage.create_directory(directory, rights='750')

Create directory and all paths along the way if necessary. rights are a string representing a combination for user, group, others.

concert.storage.read_image(filename)

Read image from file with filename. The file type is detected automatically.

concert.storage.read_tiff(file_name)

Read tiff file from disk by tifffile module.

concert.storage.split_dsetformat(dsetname)

Strip dsetname off the formatting part wihch leaves us with the data set name.

Parameters

dsetname (str) – dataset name

await concert.storage.write_images(producer: ~typing.AsyncIterable[~concert.typing.ArrayLike], writer: ~typing.Type[~concert.writers.TiffWriter] = <class 'concert.writers.TiffWriter'>, prefix: str = 'image_{:>05}.tif', start_index: int = 0, bytes_per_file: int = 0, rights: str = '750') int
write_images(pqueue, writer=TiffWriter, prefix=”image_{:>05}.tif”,

start_index=0, bytes_per_file=0)

Write images on disk with specified writer and file name prefix. Write to one file until the bytes_per_file bytes has been written. If it is 0, then one file per image is created. writer is a subclass of writers.ImageWriter. start_index specifies the number in the first file name, e.g. for the default prefix and start_index 100, the first file name will be image_00100.tif. If prefix is not formattable images are appended to the filename specified by prefix.

concert.storage.write_libtiff(file_name, data)

Write a TIFF file using pylibtiff. Return the written file name.

concert.storage.write_tiff(file_name, data)

The default TIFF writer which uses tifffile module. Return the written file name.