Std.Stream
A stream of elements.
A stream is an infinite sequence of elements. Since the stream is defined coinductively it can be processed only corecursively. That means that in general, one cannot aggregate a stream into normal (inductive) data. But it is still possible to get an aggregate snapshot of some intermediate state in the form of futures.
Streams can be observed and combined. There is no built in notion of the end of stream and all streams are considered infinite. It is still possible to simulate an end of stream, by using futures, that designates the end of stream condition.
Streams can be made lazy in the sense that if no one is watching a stream, then no work should be performed to feed the stream. This requires some cooperation from the feeder, as it should use on_subscribe
and on_unsubscribe
functions, to react on user's subsriptions. The has_subscribers
is also useful.
Streams also provide some mechanism for a pushback, that allows a cooperative sink to limit his rate. The pushback interface consists of two functions:
wait
that should be called by a consumer, when it wants to ask a producer to wait for a moment;on_wait
that is called when any consumer requested for a pause.The pushback interface is not mandatory.
type 'a t = 'a stream
module Variadic : Variadic.S with type 'a arg = 'a t
Applying function to multiple streams.
create ()
returns a stream and a signal handler that is used to feed the stream. Every time a value is signaled, it will occur in the stream.
from f
returns a stream that is generated from successive applications of a function f
. A new value is produced by a stream, every time it is signaled with associated signal handler.
unfold ~init ~f
a more general than from
way of building a stream, that allows to pass state between consecutive invocations of the generator function. A new value is produced by a stream, every time it is signaled with an associated signal handler.
unfold_until ~init ~f
returns (stream,signal,future)
is the same as unfold
, except that function f
is called until it returns a None
value. Once this happens, the future
becomes determined.
unfold' ~init ~f
is a batched version of the unfold
function. A new value is produced by a stream, every time it is signaled with associated signal handler.
repeat x
returns a stream xs
and a signal s
. Every time s
is signaled stream xs
will produce a value x
of_list xs
returns a stream ss
, a signal s
and a future es
. Stream will produce consequently elements of xs
every time the signal s
is sent. Once all elements are produced the future es
will occur signifying the end of the underlying sequence. All consecutive signals from es
are ignored.
of_array xs
returns a stream ss
, a signal s
and a future es
. Stream will produce consequently elements of xs
every time the signal s
is sent. Once all elements are produced the future es
will occur signifying the end of the underlying sequence. All consecutive signals from es
are ignored.
of_seq xs
returns a stream ss
, a signal s
and a future es
. Stream will produce consequently elements of xs
every time the signal s
is sent. Once all elements are produced the future es
will occur signifying the end of the underlying sequence. All consecutive signals from es
are ignored.
In order to start to monitor a stream, a user should subscribe to the stream using one of the subscription functions: watch
, observe
, subscribe
.
The subscription can be canceled by using an unsubscribe
function, or by throwing an exception from the callback function. The latter plays well with `with_return` function.
watch s f
watches a stream s
with a function f
. A subscription identifier is passed to the function, so it can be used to unsubscribe from the stream directly from the function.
val observe : 'a t -> ('a -> unit) -> unit
observe s f
is like watch
but a subscription identifier is not passed to the function f
.
subscribe s f
subscribe to a stream s
with a function f
. A subscription identifier is returned.
unsubscribe s id
stop calling a function that was has a provided identifier id
val wait : 'a t -> unit
wait xs
a polite way to notify a producer to slow down. Note: producer is not required to obey.
val has_subscribers : 'a t -> bool
has_subscribers s
is true if someone is watching for the stream
on_subscribe s f
will call a function f
every time someone is subscribed to a stream s
on_unsubscribe s f
will call a function f
every time someone has canceled subscription to a stream s
val on_wait : 'a t -> (unit -> unit) -> unit
on_wait s f
will be called every time someone, watching a stream s
, will call wait s
to ask a producer to slow down.
s' = map' s ~f
apply function f
for each value of a stream s
and push values from a resulting queue into the stream s'
. Example:
let q,p = of_list ['a','b','c', '.']
let q' = map q ~f:(function
| 'a'..'z' as c ->
Queue.of_list Char.[uppercase c; lowercase c]
| c -> Queue.singleton c
Will produce:
<A; a; B; b; C; c; .>
map ss ~f
returns new stream, that is build by application of a function f
to each element of the stream ss
filter_map s ~f
for each value x
in stream s
, produce y
if f x
is Some y
, otherwise ignore x
filter s f
produce a stream that contains the elements of stream s
, for which f
evaluates to true.
either xs ys
is a discriminated union of two streams.
merge xs ys f
merges streams xs
and ys
using function f
. To merge an arbitrary number of streams use the Variadic
interface.
apply fs xs
apply stream of functions fs
to a stream of values xs, producing a stream of results.
concat ss
returns a stream that will produce elements from the input list of streams ss
. The ordering of the elements of different streams is unspecified, though it is guaranteed that elements of the same stream will preserve their ordering.
concat_merge xs ~f
builds a stream, that will produce elements from the input list and applies f
to all consecutive elements. The ordering of the input list does not mandate the ordering of elements in the output stream, and is undefined. See concat
for more information.
split xs ~f
returns a pair of streams, where the first stream contains fst (f x)
for each x
in xs
and the second stream contains snd (f x)
for each x
in xs
.
zip xs ys
creates a steam that will produce an element (x,y)
every time both xs
and ys
produce elements x
and y
respectively
unzip xs
creates a pair of streams, where the first stream contains fst x
for each x
in xs
and the second stream contains snd x
for each x
in xs
. Essentially, the same as split ~f:Fn.id
once xs
creates a stream that will at most contain the next value produced by xs
and nothing more.
parse ss ~init ~f
parses stream ss
and builds new stream ss'
. Function f
is applied to each consecutive element of the stream ss
with a state s
. If function f
returns None,s'
, then no value is produced in the output state and state s'
is passed to the next invocation of function f
. If it returns Some x, s'
, then value x
is produced by the output stream and state s'
is passed to a consecutive invocation of f
. If it state type 'b
is an instance of a list type, then parse will be a push down automaton. With arbitrary type of state it is possible to build automatons that falls between PDA and Turing Machine (not including the latter).
foldw ss n ~init ~f
performs a windowed fold of the stream. A function f
is folded over n
consecutive elements of ss
, then the result is produced into the output stream, the window is shifted by stride
(defaults to one) and function f
applied to the next n
elements. For example, if stream ss
produced the following sequence of elements:
1,2,3,4,5,6,7,8
and windows length n
is equal to 3
, then the function f
will be applied to a sequences:
[1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6,7], [6,7,8]
.
Example, a moving average filter implemented with foldw
:
let moving_average ss n =
Float.(foldw ss n ~init:zero ~f:(+) >>| fun s / of_int n)
frame ~clk s ~init ~f
will gather elements of s
into frames, where the start of the new frame is signaled by a stream clk
. The function is very similar to foldw
except, that the window is determined dynamically by a clk
stream. This function is useful to build custom time scales.
The semantics of the function can be clarified with the following description: 1. Every time a stream s
produces a value it is buffered 2. Every time a stream clk
produces a value, a function f
is folded over all buffered value, and the result is put into the output stream. The internal buffer is cleared afterwards.
Example -------
Consider the following timing diagram, where each row represents a stream, and columns represent time. Elements of the clk
stream are depicted with a T
symbol.
clk: T T T T T T ss: 123 56 123 12 1234 4 1234 1
will be framed in the following way:
[123], [5612312], [12344], [], [1234], [1]
Note: since all streams should be serialized it is impossible, that two events occur at the same time. So at the same column of the timing diagram there can be only one event.
sample ~clk ss
is semantically the same as frame ~clk ss >>| fst
hd s
returns a future
that will occur as soon, as stream s
will produce a value. Note: if hd
is called on a stream, that already produced some values, the returned future will still be fulfilled on the first value, that will be put into the stream after the future is created.
find xs f
returns a future that will be fulfilled with a first value for which a function f
is true
.
find_map xs f
returns a future that will be fulfilled by a result of a first invocation of f
to an element of the stream, that evaluated to Some
value
take xs n
returns a future that will evaluate to n
values of the stream xs
that has occurred after the future was created.
nth xs n
returns n
'th element of the stream xs
. The element is n
'th with respect to the future f
, if was n
'th element of the stream after the creation of the stream.
upon e xs
returns a future that will be fulfilled with a last value of a stream xs
before an event e
has occurred. If at the time when the event e
occurs, the stream xs
didn't produce any elements, then the future will not be fulfilled.
before e xs
returns a list that contains elements of the stream xs
that occurred before the event e