Coroutines provide a way to process data and yield execution until more data is produced. Generators represent the source of data and can used as normal iterators, e.g. in a for loop. Coroutines can use the output of a generator to either process data and output a new result item in a filter fashion or process the data without further results in a sink fashion.
Coroutines are simple functions that get their input by calling yield on the right side or as an argument. Because they need to be started in a particular way, it is useful to decorate a coroutine with the coroutine() decorator:
from concert.coroutines.base import coroutine @coroutine def printer(): while True: item = yield print(item)
This coroutine fetches data items and prints them one by one. Because no data is produced, this coroutine falls into the sink category. Concert provides some common pre-defined sinks in the sinks module.
Filters hook into the data stream and process the input to produce some output. For example, to generate a stream of squared input, you would write:
@coroutine def square(consumer): while True: item = yield consumer.send(item**2)
You can find a variety of pre-defined filters in the filters module.
Connecting data sources with coroutines¶
In order to connect a generator that yields data to a filter or a sink it is necessary to bootstrap the pipeline by using the inject() function, which forwards generated data to a coroutine:
from concert.coroutines.base import inject def generator(n): for i in range(n): yield i # Use the output of generator to feed into printer inject(generator(5), printer())
To fan out a single input stream to multiple consumers, you can use the broadcast() like this:
from concert.coroutines.base import broadcast source(5, broadcast(printer(), square(printer())))
The generators and coroutines yield execution, but if the data production should not be stalled by data consumption the coroutine should only provide data buffering and delegate the real consumption to a separate thread or process. The same can be achieved by first buffering the data and then yielding them by a generator. It comes from the fact that a generator will not produce a new value until the old one has been consumed.
The ufo module provides classes to process data from an experiment with the UFO data processing framework. The simplest example could look like this:
from concert.ext.ufo import InjectProcess from gi.repository import Ufo import numpy as np import scipy.misc pm = Ufo.PluginManager() writer = pm.get_task('writer') writer.props.filename = 'foo-%05i.tif' proc = InjectProcess(writer) proc.run() proc.push(scipy.misc.lena()) proc.join()
To save yourself some time, the ufo module provides a wrapper around the raw UfoPluginManager:
from concert.ext.ufo import PluginManager pm = PluginManager() writer = pm.get_task('writer', filename='foo-%05i.tif')
Viewing processed data¶
from concert.devices.cameras.dummy import Camera from concert.ext.viewers import PyplotImageViewer # Create a camera and execute something with it in recording state camera = Camera() with camera.recording(): # Create a viewer and show one frame viewer = PyplotImageViewer() viewer.show(camera.grab())