Core objects
Parameters
- class concert.base.Parameter(fget=None, fset=None, fget_target=None, data=None, check=None, help=None)
A parameter with getter and setter.
Parameters are similar to normal Python properties and can additionally trigger state checks. If fget or fset is not given, you must implement the accessor functions named _set_name and _get_name:
from concert.base import Parameter, State, check class SomeClass(object): state = State(default='standby') param = Parameter(check=check(source='standby', target=['standby', 'moving'])) def _set_param(self, value): pass def _get_param(self): pass
When a
Parameteris attached to a class, you can modify it by accessing its associatedParameterValuewith a dictionary access:obj = SomeClass() print(obj['param'])
fget is a callable that is called when reading the parameter. fset is called when the parameter is written to. fget_target is a getter for the target value. fget, fset, fget_target must be member functions of the corresponding Parameterizable object.
data is passed to the state check function.
check is a
check()that changes states when a value is written to the parameter.help is a string describing the parameter in more detail.
- class concert.base.ParameterValue(instance, parameter)
Value object of a
Parameter.- await get(wait_on=None)
Get coroutine obtaining the concrete value of this object.
If wait_on is not None, it must be an awaitable on which this method waits.
- await get_target(wait_on=None)
Get coroutine obtaining target value of this object.
If wait_on is not None, it must be an awaitable on which this method waits.
- lock(permanent=False)
Lock parameter for writing. If permament is True the parameter cannot be unlocked anymore.
- property locked
Return True if the parameter is locked for writing.
- await restore()
Restore the last value saved with
ParameterValue.stash().If the parameter can only be read or no value has been saved, this operation does nothing.
- await set(value, wait_on=None)
Set concrete value on the object.
If wait_on is not None, it must be an awaitable on which this method waits.
- await stash()
Save the current value internally on a growing stack.
If the parameter is writable the current value is saved on a stack and to be later retrieved with
ParameterValue.restore().
- unlock()
Unlock parameter for writing.
- await wait(value, sleep_time=<Quantity(0.1, 'second')>, timeout=None)
Wait until the parameter value is value. sleep_time is the time to sleep between consecutive checks. timeout specifies the maximum waiting time.
- property writable
Return True if the parameter is writable.
- class concert.base.Quantity(unit, fget=None, fset=None, fget_target=None, lower=None, upper=None, data=None, check=None, external_lower_getter=None, external_upper_getter=None, user_lower_getter=None, user_lower_setter=None, user_upper_getter=None, user_upper_setter=None, help=None)
Bases:
ParameterA
Parameterassociated with a unit.fget, fset, data, check and help are identical to the
Parameterconstructor arguments.unit is a Pint quantity. lower and upper denote soft limits between the
Quantityvalues can lie.
- class concert.base.QuantityValue(instance, quantity)
Bases:
ParameterValue- await get(wait_on=None)
Get coroutine obtaining the concrete value of this object.
If wait_on is not None, it must be an awaitable on which this method waits.
- lock_limits(permanent=False)
Lock limits, if permanent is True the limits cannot be unlocked anymore.
- await set(value, wait_on=None)
Set concrete value on the object.
If wait_on is not None, it must be an awaitable on which this method waits.
- unlock_limits()
Unlock limits.
- await wait(value, eps=None, sleep_time=<Quantity(0.1, 'second')>, timeout=None)
Wait until the parameter value is value. eps is the allowed discrepancy between the actual value and value. sleep_time is the time to sleep between consecutive checks. timeout specifies the maximum waiting time.
Collection of parameters
- class concert.base.Parameterizable(self)
Collection of parameters.
For each class of type
Parameterizable,Parametercan be set as class attributesclass Device(Parameterizable): def get_something(self): return 'something' something = Parameter(get_something)
There is a simple
Parameterand a parameter which models a physical quantityQuantity.A
Parameterizableis iterable and returns its parameters of typeParameterValueor its subclassesfor param in device: print("name={}".format(param.name))
To access a single name parameter object, you can use the
[]operator:param = device['position']
If the parameter name does not exist, a
ParameterErroris raised.Each parameter value is accessible as a property. If a device has a position it can be read and written with:
param.position = 0 * q.mm print param.position
Setting limits and values can be handles in two ways:
lower_foo = None upper_foo = None position_foo = 0 * q.mm def get_lower_foo_softlimit(): global lower_foo return lower_foo def get_upper_foo_softlimit(): global upper_foo return upper_foo def set_lower_foo_softlimit(value): global lower_foo lower_foo = value def set_upper_foo_softlimit(value): global upper_foo upper_foo = value def get_foo_from_hardware(): global position_foo return position_foo def send_foo_to_hardware(value): global position_foo position_foo = value from concert.base import Quantity, Parameterizable class DeviceWithClassGetter(Parameterizable): foo = Quantity(q.mm) async def __ainit__(self): await super().__ainit__() async def _get_foo(self): return get_foo_from_hardware() async def _set_foo(self, value): send_foo_on_hardware(value) async def _get_foo_lower_external_limit(self): return -4 * q.mm async def _get_foo_upper_external_limit(self): return 4 * q.mm async def _get_foo_lower_user_limit(self): return get_lower_foo_softlimit() async def _get_foo_upper_user_limit(self): return get_upper_foo_softlimit() async def _set_foo_lower_user_limit(self, val): return set_lower_foo_softlimit(val) async def _set_foo_upper_user_limit(self, val): return set_upper_foo_softlimit(val) lower_bar = None upper_bar = None position_bar = 0 * q.mm async def get_lower_bar_softlimit(): global lower_bar return lower_bar async def get_upper_bar_softlimit(): global upper_bar return upper_bar async def set_lower_bar_softlimit(value): global lower_bar lower_bar = value async def set_upper_bar_softlimit(value): global upper_bar upper_bar = value async def get_bar_from_hardware(cls): global position_bar return position_bar async def send_bar_to_hardware(cls, value): global position_bar position_bar = value class DeviceWithSetterInConstructor(Parameterizable): bar = Quantity(q.mm, fget=get_bar_from_hardware, fset=send_bar_to_hardware, user_lower_getter=get_lower_bar_softlimit, user_upper_getter=get_upper_bar_softlimit, user_lower_setter=set_lower_bar_softlimit, user_upper_setter=set_upper_bar_softlimit) async def __ainit__(self): await super().__ainit__()
- install_parameters(params)
Install parameters at run-time.
params is a dictionary mapping parameter names to
Parameterobjects.
- lock(permanent=False)
Lock all the parameters for writing. If permanent is True, the parameters cannot be unlocked anymore.
- await restore()
Restore all parameters saved with
Parameterizable.stash().
- await stash()
Save all writable parameters that can be restored with
Parameterizable.restore().The values are stored on a stacked, hence subsequent saved states can be restored one by one.
- unlock()
Unlock all the parameters for writing.
- class concert.base.RunnableParameterizable(self)
State machine
- class concert.base.State(default=None, fget=None, fset=None, data=None, check=None, help=None)
Finite state machine.
Use this on a class, to keep some sort of known state. In order to enforce restrictions, you would decorate methods on the class with
check():class SomeObject(object): state = State(default='standby') @check(source='*', target='moving') def move(self): pass
In case your device doesn’t provide information on its state you can use the
transition()to store the state in an instance of your device:@transition(immediate='moving', target='standby') def _set_some_param(self, param_value): # when the method starts device state is set to *immediate* # long operation goes here pass # the state is set to *target* in the end
Accessing the state variable will return the current state value, i.e.:
obj = SomeObject() assert obj.state == 'standby'
The state cannot be set explicitly by:
obj.state = 'some_state'
but the object needs to provide methods which transition out of states, the same holds for transitioning out of error states. If the
_get_state()method is implemented in the device it is always used to get the state, otherwise the state is stored in software.fget is a callable that is called when reading the parameter. fset is called when the parameter is written to. fget_target is a getter for the target value. fget, fset, fget_target must be member functions of the corresponding Parameterizable object.
data is passed to the state check function.
check is a
check()that changes states when a value is written to the parameter.help is a string describing the parameter in more detail.
- concert.base.check(source: Union[str, List[str]] = '*', target: Union[str, List[str]] = '*', state_name='state')
Decorates a method for checking the device state.
source denotes the source state that must be present at the time of invoking the decorated method. target is the state that the state object will be after successful completion of the method or a list of possible target states. state_name can be used to define the state if multiple states are used.
- concert.base.transition(immediate=None, target=None, state_name='state')
Change software state of a device to immediate. After the function execution finishes change the state to target. On
asyncio.CancelledError, state is set to target and cleanup logic must take place in the callable to be wrapped. state_name can be used to define the state if multiple states are used.
Devices
- class concert.devices.base.Device(self)
Bases:
ParameterizableA
Deviceprovides locked access to a real-world device.It implements the context protocol to provide locking:
async with device: # device is locked await device.set_parameter(1 * q.m) ... # device is unlocked again
- await emergency_stop()
Emergency stop.
- class concert.devices.dummy.DeviceWithClassGetter(self)
Example of a device that uses setters/getters for limits in the device class.
A real device would talk to the hardware or a database for storing/receiving the values.
- class concert.devices.dummy.DeviceWithSetterInConstructor(self)
Example of a device that uses setters/getters for limits passed to the Quantity constructor.
A real device would talk to the hardware or a database for storing/receiving the values.
Asynchronous execution
- exception concert.coroutines.base.WaitError
Raised on busy waiting timeouts
- concert.coroutines.base.background(corofunc)
Same as
start(), just meant to be used as a decorator.
- concert.coroutines.base.broadcast(producer, *consumers)
Feed producer to all consumers.
- await concert.coroutines.base.ensure_coroutine(func, *args, **kwargs)
func(*args, **kwargs) returns an awaitable which is wrapped here into a real coroutine. This is useful for turuning futures from other libraries, like Tango, into real coroutines.
- await concert.coroutines.base.feed_queue(producer, func, *args)
Feed function func with items from producer in a separete thread. The signatute must be func(queue, *args) where elements in the queue are instances of
concert.helpers.PrioItem.
- concert.coroutines.base.get_event_loop()
Get asyncio’s event loop.
- await concert.coroutines.base.run_in_executor(func, *args)
Run a blocking function func with signature func(*args) in an executor. If run_in_executor() is cancelled, it blocks until func finishes so that all resources are in consistent state.
- concert.coroutines.base.run_in_loop(coroutine, error_msg_if_running=None)
Wrap coroutine into a asyncio.Task, run it in the current loop, block until it finishes and return the result. On KeyboardInterrupt, the task is cancelled. Raise RuntimeError with message error_msg_if_running in case the loop is already running, otherwise Python will take care of the error reporting.
- concert.coroutines.base.start(awaitable)
Wrap awaitable into a task and start its execution right away. The returned task will also be cancellable by ctrl-k.
- await concert.coroutines.base.wait_until(condition, sleep_time=<Quantity(0.1, 'second')>, timeout=None)
Wait until a callable condition returns True. sleep_time is the time to sleep between consecutive checks of condition. If timeout is given and the condition doesn’t return True within the time specified by it a
WaitingErroris raised.
Exceptions
- class concert.base.UnitError
Raised when an operation is passed value with an incompatible unit.
- class concert.base.LimitError
Raised when an operation is passed a value that exceeds a limit.
- class concert.base.ParameterError(parameter)
Raised when a parameter is accessed that does not exists.
- class concert.base.AccessorNotImplementedError
Raised when a setter or getter is not implemented.
- class concert.base.ReadAccessError(parameter)
Raised when user tries to read a parameter that cannot be read.
- class concert.base.WriteAccessError(parameter)
Raised when user tries to read a parameter that cannot be read.
- class concert.base.StateError(error_state, msg=None)
Raised in state check functions of devices.
Configuration
- concert.config.MOTOR_VELOCITY_SAMPLING_TIME
Time step for calculation of motor velocity by measuring two postion values. Longer values will create more acurate results but reading the velocity will take more time.
- concert.config.PROGRESS_BAR
Turn on progress bar by long-lasting operations if tqdm package is present
Sessions
- class concert.session.utils.SubCommand(name, opts)
Base sub-command class (concert [subcommand]).
SubCommand objects are loaded at run-time and injected into Concert’s command parser.
name denotes the name of the sub-command parser, e.g. “mv” for the MoveCommand. opts must be an argparse-compatible dictionary of command options.
- abstractmethod run(*args, **kwargs)
Run the command
- concert.session.utils.abort_awaiting(skip=None)
Cancel background tasks. skip are coroutine names which are not cancelled.
- await concert.session.utils.check_emergency_stop(check, poll_interval=0.1 * q.s, exit_session=False)
If a callable check returns True abort is called. Then until it clears to False nothing is done and then the process begins again. poll_interval is the interval at which check is called. If exit_session is True the session exits when the emergency stop occurs.
- concert.session.utils.code_of(func)
Show implementation of func.
- concert.session.utils.ddoc()
Render device documentation.
- concert.session.utils.dstate()
Render device state in a table.
- concert.session.utils.get_default_table(field_names, widths=None)
Return a prettytable styled for use in the shell. field_names is a list of table header strings.
- concert.session.utils.pdoc(hide_blacklisted=True)
Render process documentation.
Networking
Networking package facilitates all network connections, e.g. sockets and Tango.
Socket Connections
- class concert.networking.base.SocketConnection(host, port, return_sequence='\n')
A two-way socket connection. return_sequence is a string appended after every command indicating the end of it, the default value is a newline (n).
- await close()
Close connection.
- await connect()
Open connection.
- await execute(data, num=1024)
Execute command and wait for response (coroutine-safe, not thread-safe). Read num bytes from the socket.
- await recv(num=1024)
Read num bytes from the socket. The result is first stripped from the trailing return sequence characters and then returned.
- await send(data)
Send data to the peer. The return sequence characters are appended to the data before it is sent.
TANGO
Tango devices are interfaced by PyTango, one can obtain the DeviceProxy by
the get_tango_device() function.
- concert.networking.base.get_tango_device(uri, peer=None, timeout=<Quantity(10, 'second')>)
Get a Tango device by specifying its uri. If peer is given change the tango_host specifying which database to connect to. Format is host:port as a string. timeout sets the device’s general timeout. It is converted to milliseconds, converted to integer and then the tango device’s set_timout_millis is called with the converted integer value.
Helpers
- class concert.helpers.Bunch(values)
Encapsulate a list or dictionary to provide attribute-like access.
Common use cases look like this:
d = {'foo': 123, 'bar': 'baz'} b = Bunch(d) print(b.foo) >>> 123 l = ['foo', 'bar'] b = Bunch(l) print(b.foo) >>> 'foo'
- class concert.helpers.CommData(host: str, port: int = None, protocol: str = 'tcp', socket_type: SocketType = SocketType.PUSH, sndhwm: int = 1000)
Encapsulates communication metadata.
- class concert.helpers.ImageWithMetadata(input_array: ndarray, metadata: dict | None = None)
Subclass of numpy.ndarray with a metadata dictionary to hold images its metadata.
- Parameters
input_array – Input array (can be an instance of
ImageWithMetadata)metadata – metadata
- class concert.helpers.PerformanceTracker(summary=False, loglevel=8)
A stopwatch with the ability to report data throughput.
- Parameters
summary – if True, output everything at the end, otherwise immediately on
lap()
call. :param loglevel: logging level
- class concert.helpers.PrioItem(priority: int, data: Any)
To be used in combination with queue.PriorityQueue.
- concert.helpers.arange(start, stop, step)
This function wraps numpy.arange but strips the units before and adds the unit later at the numpy.array.
- Parameters
start (concert.quantities.q.Quantity) –
stop (concert.quantities.q.Quantity) –
step (concert.quantities.q.Quantity) –
- Returns
- class concert.helpers.expects(*args, **kwargs)
Decorator which determines expected arguments for the function and also check correctness of given arguments. If input arguments differ from expected ones, exception TypeError will be raised.
For numeric arguments use Numeric class with 2 parameters: dimension of the array and units (optional). E.g. “Numeric (1)” means function expects one number or “Numeric (2, q.mm)” means function expects expression like [4,5]*q.mm
Common use case looks like this:
from concert.helpers import Numeric @expects(Camera, LinearMotor, pixelsize = Numeric(2, q.mm)) def foo(camera, motor, pixelsize = None): pass
- concert.helpers.is_iterable(item)
Is item iterable or not.
- concert.helpers.linspace(start, stop, num, endpoint=True)
This function wraps numpy.linspace but strips the units before and adds the unit later at the numpy.array.
- concert.helpers.measure(func=None, return_result=False)
Measure and print execution time of func.
If return_result is True, the decorated function returns a tuple consisting of the original return value and the measured time in seconds.
- concert.helpers.memoize(func)
Memoize the result of func.
Remember the result of func depending on its arguments. Note, that this requires that the function is free from any side effects, e.g. returns the same value given the same arguments.
Storage
storage.py
Storage implementations.
- class concert.storage.DirectoryWalker(self, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', writer: Type[TiffWriter] = <class 'concert.writers.TiffWriter'>, start_index: int = 0, bytes_per_file: int = 0, rights: str = '750')
A DirectoryWalker moves through a file system and writes flat files using a specific filename template.
- await __ainit__(root: ~typing.Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', writer: ~typing.Type[~concert.writers.TiffWriter] = <class 'concert.writers.TiffWriter'>, start_index: int = 0, bytes_per_file: int = 0, rights: str = '750') None
Use writer to write data to files with filenames with a template from dsetname. start_index specifies the number in the first file name, e.g. for the default dsetname and start_index 100, the first file name will be frame_000100.tif.
- await log_to_json(payload: str, filename: str = 'experiment.json') None
Logs experiment metadata as payload to a file called experiment.json
NOTE: This method does not have to be a coroutine. We still made it so to maintain coherence at the api level. With the unification of the top layer walker api this concern would be addressed.
- Parameters
payload (str) – content to write
- class concert.storage.DummyWalker(self, root: str = '')
Walker object used for testing purposes
- await exists(*paths) bool
Return True if path from current position specified by a list of paths exists.
- class concert.storage.RemoteDirectoryWalker(self, device: RemoteDirectoryWalkerTangoDevice, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', wrt_cls: str = 'TiffWriter', start_index: int = 0, bytes_per_file: int = 0)
Defines the api layer of a directory walker for a remote file system. Encapsulates a Tango device which runs on the remote file system where the data needs to be written.
- await __ainit__(device: RemoteDirectoryWalkerTangoDevice, root: Optional[str] = None, dsetname: str = 'frame_{:>06}.tif', wrt_cls: str = 'TiffWriter', start_index: int = 0, bytes_per_file: int = 0) None
Initializes a remote directory walker. This walker implementation encapsulates a Tango device server and delegates its core utilities to the same. :param device: an abstract tango device conforming to remote tango walker specification :type device: RemoteDirectoryWalkerTangoDevice :param root: file system root for to start traversal, if None current directory of the walker is used :type root: Optional[str] :param dsetname: template for writing files of the dataset :type dsetname: str :param wrt_cls: specific writer class which the device should use to write files :type wrt_cls: str :param start_index: number of the first file name in the dataset :type start_index: int :param bytes_per_file: size limit for a file, 0 denotes 1 file per image :type bytes_per_file: int
- await home() None
Return to root remotely and inside its own context (which is implemented in the super class)
- await log_to_json(payload: str, filename: str = 'experiment.json') None
Implements api layer for writing experiment metadata
- exception concert.storage.StorageError
Exception related to logical issues with storage
- class concert.storage.Walker(self, root: str, dsetname: str = 'frames')
A Walker moves through an abstract hierarchy and allows to write data at a specific location.
- await __ainit__(root: str, dsetname: str = 'frames') None
Constructor. root is the topmost level of the data structure
- await create_writer(producer: AsyncIterable[ArrayLike], name: Optional[str] = None, dsetname: Optional[str] = None) Awaitable
Create a writer coroutine for writing data set dsetname with images from producer inside. If name is given, descend to it first and once the writer is created ascend back. This way, the writer can operate in name and the walker can be safely used to move around and create other writers elsewhere while the created writer is working. The returned coroutine is not guaranteed to be wrapped into a
asyncio.Task, hence to be started immediately. This function also does not block after creating the writer. This is useful for splitting the preparation of writing (creating directories, …) and the I/O itself.
- abstractmethod await exists(*paths) bool
Return True if path from current position specified by a list of paths exists.
- abstractmethod await log_to_json(payload: str, filename: str = 'experiment.json') None
Provides local counterpart of the remote logging of experiment metadata. Writes the provided payload to a static file called experiment.json.
- Parameters
payload (str) – content to write
- abstractmethod await register_logger(logger_name: str, log_level: int, file_name: str) AsyncLoggingHandlerCloser
Registers a logger with walker device server and provides a remote logging handler
- await write(producer: AsyncIterable[ArrayLike], dsetname: Optional[str] = None) Awaitable
Create a coroutine for writing data set dsetname with images from producer. The execution starts immediately in the background and await will block until the images are written.
- concert.storage.create_directory(directory, rights='750')
Create directory and all paths along the way if necessary. rights are a string representing a combination for user, group, others.
- concert.storage.read_image(filename)
Read image from file with filename. The file type is detected automatically.
- concert.storage.read_tiff(file_name)
Read tiff file from disk by
tifffilemodule.
- concert.storage.split_dsetformat(dsetname)
Strip dsetname off the formatting part wihch leaves us with the data set name.
- Parameters
dsetname (str) – dataset name
- await concert.storage.write_images(producer: ~typing.AsyncIterable[~concert.typing.ArrayLike], writer: ~typing.Type[~concert.writers.TiffWriter] = <class 'concert.writers.TiffWriter'>, prefix: str = 'image_{:>05}.tif', start_index: int = 0, bytes_per_file: int = 0, rights: str = '750') int
- write_images(pqueue, writer=TiffWriter, prefix=”image_{:>05}.tif”,
start_index=0, bytes_per_file=0)
Write images on disk with specified writer and file name prefix. Write to one file until the bytes_per_file bytes has been written. If it is 0, then one file per image is created. writer is a subclass of
writers.ImageWriter. start_index specifies the number in the first file name, e.g. for the default prefix and start_index 100, the first file name will be image_00100.tif. If prefix is not formattable images are appended to the filename specified by prefix.
- concert.storage.write_libtiff(file_name, data)
Write a TIFF file using pylibtiff. Return the written file name.
- concert.storage.write_tiff(file_name, data)
The default TIFF writer which uses
tifffilemodule. Return the written file name.