Std.StreamA stream of elements.
A stream is an infinite sequence of elements. Since the stream is defined coinductively it can be processed only corecursively. That means that in general, one cannot aggregate a stream into normal (inductive) data. But it is still possible to get an aggregate snapshot of some intermediate state in the form of futures.
Streams can be observed and combined. There is no built in notion of the end of stream and all streams are considered infinite. It is still possible to simulate an end of stream, by using futures, that designates the end of stream condition.
Streams can be made lazy in the sense that if no one is watching a stream, then no work should be performed to feed the stream. This requires some cooperation from the feeder, as it should use on_subscribe and on_unsubscribe functions, to react on user's subsriptions. The has_subscribers is also useful.
Streams also provide some mechanism for a pushback, that allows a cooperative sink to limit his rate. The pushback interface consists of two functions:
wait that should be called by a consumer, when it wants to ask a producer to wait for a moment;on_wait that is called when any consumer requested for a pause.The pushback interface is not mandatory.
type 'a t = 'a streammodule Variadic : Variadic.S with type 'a arg = 'a tApplying function to multiple streams.
create () returns a stream and a signal handler that is used to feed the stream. Every time a value is signaled, it will occur in the stream.
from f returns a stream that is generated from successive applications of a function f. A new value is produced by a stream, every time it is signaled with associated signal handler.
unfold ~init ~f a more general than from way of building a stream, that allows to pass state between consecutive invocations of the generator function. A new value is produced by a stream, every time it is signaled with an associated signal handler.
unfold_until ~init ~f returns (stream,signal,future) is the same as unfold, except that function f is called until it returns a None value. Once this happens, the future becomes determined.
unfold' ~init ~f is a batched version of the unfold function. A new value is produced by a stream, every time it is signaled with associated signal handler.
repeat x returns a stream xs and a signal s. Every time s is signaled stream xs will produce a value x
of_list xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
of_array xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
of_seq xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.
In order to start to monitor a stream, a user should subscribe to the stream using one of the subscription functions: watch, observe, subscribe.
The subscription can be canceled by using an unsubscribe function, or by throwing an exception from the callback function. The latter plays well with `with_return` function.
watch s f watches a stream s with a function f. A subscription identifier is passed to the function, so it can be used to unsubscribe from the stream directly from the function.
val observe : 'a t -> ('a -> unit) -> unitobserve s f is like watch but a subscription identifier is not passed to the function f.
subscribe s f subscribe to a stream s with a function f. A subscription identifier is returned.
unsubscribe s id stop calling a function that was has a provided identifier id
val wait : 'a t -> unitwait xs a polite way to notify a producer to slow down. Note: producer is not required to obey.
val has_subscribers : 'a t -> boolhas_subscribers s is true if someone is watching for the stream
on_subscribe s f will call a function f every time someone is subscribed to a stream s
on_unsubscribe s f will call a function f every time someone has canceled subscription to a stream s
val on_wait : 'a t -> (unit -> unit) -> uniton_wait s f will be called every time someone, watching a stream s, will call wait s to ask a producer to slow down.
s' = map' s ~f apply function f for each value of a stream s and push values from a resulting queue into the stream s'. Example:
let q,p = of_list ['a','b','c', '.']
let q' = map q ~f:(function
    | 'a'..'z' as c ->
      Queue.of_list Char.[uppercase c; lowercase c]
    | c -> Queue.singleton cWill produce:
<A; a; B; b; C; c; .>
map ss ~f returns new stream, that is build by application of a function f to each element of the stream ss
filter_map s ~f for each value x in stream s, produce y if f x is Some y, otherwise ignore x
filter s f produce a stream that contains the elements of stream s, for which f evaluates to true.
either xs ys is a discriminated union of two streams.
merge xs ys f merges streams xs and ys using function f. To merge an arbitrary number of streams use the Variadic interface.
apply fs xs apply stream of functions fs to a stream of values xs, producing a stream of results.
concat ss returns a stream that will produce elements from the input list of streams ss. The ordering of the elements of different streams is unspecified, though it is guaranteed that elements of the same stream will preserve their ordering.
concat_merge xs ~f builds a stream, that will produce elements from the input list and applies f to all consecutive elements. The ordering of the input list does not mandate the ordering of elements in the output stream, and is undefined. See concat for more information.
split xs ~f returns a pair of streams, where the first stream contains fst (f x) for each x in xs and the second stream contains snd (f x) for each x in xs.
zip xs ys creates a steam that will produce an element (x,y) every time both xs and ys produce elements x and y respectively
unzip xs creates a pair of streams, where the first stream contains fst x for each x in xs and the second stream contains snd x for each x in xs. Essentially, the same as split ~f:Fn.id
once xs creates a stream that will at most contain the next value produced by xs and nothing more.
parse ss ~init ~f parses stream ss and builds new stream ss'. Function f is applied to each consecutive element of the stream ss with a state s. If function f returns None,s', then no value is produced in the output state and state s' is passed to the next invocation of function f. If it returns Some x, s', then value x is produced by the output stream and state s' is passed to a consecutive invocation of f. If it state type 'b is an instance of a list type, then parse will be a push down automaton. With arbitrary type of state it is possible to build automatons that falls between PDA and Turing Machine (not including the latter).
foldw ss n ~init ~f performs a windowed fold of the stream. A function f is folded over n consecutive elements of ss, then the result is produced into the output stream, the window is shifted by stride (defaults to one) and function f applied to the next n elements. For example, if stream ss produced the following sequence of elements:
1,2,3,4,5,6,7,8and windows length n is equal to 3, then the function f will be applied to a sequences:
[1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6,7], [6,7,8].
Example, a moving average filter implemented with foldw:
let moving_average ss n =
  Float.(foldw ss n ~init:zero ~f:(+) >>| fun s / of_int n)frame ~clk s ~init ~f will gather elements of s into frames, where the start of the new frame is signaled by a stream clk. The function is very similar to foldw except, that the window is determined dynamically by a clk stream. This function is useful to build custom time scales.
The semantics of the function can be clarified with the following description: 1. Every time a stream s produces a value it is buffered 2. Every time a stream clk produces a value, a function f is folded over all buffered value, and the result is put into the output stream. The internal buffer is cleared afterwards.
Example -------
Consider the following timing diagram, where each row represents a stream, and columns represent time. Elements of the clk stream are depicted with a T symbol.
          clk:    T         T        T  T      T     T
           ss: 123 56 123 12  1234 4      1234  1will be framed in the following way:
[123], [5612312], [12344], [], [1234], [1]
Note: since all streams should be serialized it is impossible, that two events occur at the same time. So at the same column of the timing diagram there can be only one event.
sample ~clk ss is semantically the same as frame ~clk ss >>| fst
hd s returns a future that will occur as soon, as stream s will produce a value. Note: if hd is called on a stream, that already produced some values, the returned future will still be fulfilled on the first value, that will be put into the stream after the future is created.
find xs f returns a future that will be fulfilled with a first value for which a function f is true.
find_map xs f returns a future that will be fulfilled by a result of a first invocation of f to an element of the stream, that evaluated to Some value
take xs n returns a future that will evaluate to n values of the stream xs that has occurred after the future was created.
nth xs n returns n'th element of the stream xs. The element is n'th with respect to the future f, if was n'th element of the stream after the creation of the stream.
upon e xs returns a future that will be fulfilled with a last value of a stream xs before an event e has occurred. If at the time when the event e occurs, the stream xs didn't produce any elements, then the future will not be fulfilled.
before e xs returns a list that contains elements of the stream xs that occurred before the event e