Data processing¶
Coroutines¶
Coroutines provide a way to process data and yield execution until more data is
produced. Generators represent the source of data and can be used as normal
iterators, e.g. in a for
loop. Coroutines can use the output of a generator
to either process data and output a new result item in a filter fashion or
process the data without further results in a sink fashion. For more on
coroutines, see Concurrent execution.
Data processing with coroutines uses generators and iterates over their items like this:
async def producer(num):
for i in range(num):
yield i
async def printer(producer):
async for item in producer:
print(item)
# Usage:
await printer(producer(10))
printer coroutine fetches data items and prints them one by one. Because no data is
produced, this coroutine falls into the sink category. Concert provides some
common pre-defined sinks in the sinks
module.
Filters hook into the data stream and process the input to produce some output. For example, to generate a stream of squared input, you would write:
def square(consumer):
async for item in producer:
yield item ** 2
# Usage:
await printer(square(producer(10)))
You can find a variety of pre-defined filters in the filters
module.
Broadcasting¶
To fan out a single input stream to multiple consumers, you can use the
broadcast()
. Its first argument is the producer and the rest are
consumers. broadcast()
creates the connections from producer to consumers
and returns a list of coroutines, which can be used by asyncio.gather()
function, like this:
from concert.coroutines.base import broadcast
coros = broadcast(producer(10), printer, printer)
await asyncio.gather(*coros)
High-performance processing¶
The generators and coroutines yield execution, but if the data production should not be stalled by data consumption the coroutine should only provide data buffering and delegate the real consumption to a separate thread or process. The same can be achieved by first buffering the data and then yielding them by a generator. It comes from the fact that a generator will not produce a new value until the old one has been consumed.
High-performance computing¶
The ufo
module provides classes to process data from an experiment with
the UFO data processing framework. The simplest example could look like this:
from concert.ext.ufo import InjectProcess
from gi.repository import Ufo
import numpy as np
import scipy.misc
pm = Ufo.PluginManager()
writer = pm.get_task('write')
writer.props.filename = 'foo-%05i.tif'
proc = InjectProcess(writer)
proc.start()
await proc.insert(scipy.misc.ascent())
proc.wait()
To save yourself some time, the ufo
module provides a wrapper around the
raw UfoPluginManager
:
from concert.ext.ufo import PluginManager
pm = PluginManager()
writer = pm.get_task('write', filename='foo-%05i.tif')
Viewing processed data¶
Concert has a Matplotlib integration to simplify viewing 1D time series with the
PyplotViewer
. For 2D, there are multiple implementations, for details
see Viewers and Concert examples.
Writing image data¶
Concert provides DirectoryWalker
for traversing the filesystem and
writing image sequences. You can use its descend()
method to descend into
a sub-directory and the ascend()
method to return one level back.
If you just want to write images in the current directory use the
write()
method. To create an image writer in
either the current directory or one level below, you can use the
create_writer()
method. This method creates the writer and if you specify
a sub-directory also ascends back. You should use this in a with statement to
make sure that while you are creating the image writer, some other coroutine
does not change walker’s path. The writing itself can then happen after the
with statement:
async with walker:
writer = walker.create_writer(producer, name='subdirectory')
# create_writer ascends back so the writing itself can happen outside of the
# with statement
await writer