Module Std.Stream

A stream of elements.

A stream is an infinite sequence of elements. Since the stream is defined coinductively it can be processed only corecursively. That means that in general, one cannot aggregate a stream into normal (inductive) data. But it is still possible to get an aggregate snapshot of some intermediate state in the form of futures.

Streams can be observed and combined. There is no built in notion of the end of stream and all streams are considered infinite. It is still possible to simulate an end of stream, by using futures, that designates the end of stream condition.

Streams can be made lazy in the sense that if no one is watching a stream, then no work should be performed to feed the stream. This requires some cooperation from the feeder, as it should use on_subscribe and on_unsubscribe functions, to react on user's subsriptions. The has_subscribers is also useful.

Streams also provide some mechanism for a pushback, that allows a cooperative sink to limit his rate. The pushback interface consists of two functions:

The pushback interface is not mandatory.

type 'a t = 'a stream
type id
module Variadic : Variadic.S with type 'a arg = 'a t

Applying function to multiple streams.

val create : unit -> 'a t * 'a signal

create () returns a stream and a signal handler that is used to feed the stream. Every time a value is signaled, it will occur in the stream.

val from : (unit -> 'a) -> 'a t * unit signal

from f returns a stream that is generated from successive applications of a function f. A new value is produced by a stream, every time it is signaled with associated signal handler.

val unfold : init:'b -> f:('b -> 'a * 'b) -> 'a t * unit signal

unfold ~init ~f a more general than from way of building a stream, that allows to pass state between consecutive invocations of the generator function. A new value is produced by a stream, every time it is signaled with an associated signal handler.

val unfold_until : init:'b -> f:('b -> ('a * 'b) option) -> 'a t * unit signal * unit future

unfold_until ~init ~f returns (stream,signal,future) is the same as unfold, except that function f is called until it returns a None value. Once this happens, the future becomes determined.

val unfold' : init:'b -> f:('b -> 'a Core_kernel.Queue.t * 'b) -> 'a t * unit signal

unfold' ~init ~f is a batched version of the unfold function. A new value is produced by a stream, every time it is signaled with associated signal handler.

val repeat : 'a -> 'a t * unit signal

repeat x returns a stream xs and a signal s. Every time s is signaled stream xs will produce a value x

val of_list : 'a list -> 'a t * unit signal * unit future

of_list xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.

val of_array : 'a array -> 'a t * unit signal * unit future

of_array xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.

val of_sequence : 'a Core_kernel.Sequence.t -> 'a t * unit signal * unit future

of_seq xs returns a stream ss, a signal s and a future es. Stream will produce consequently elements of xs every time the signal s is sent. Once all elements are produced the future es will occur signifying the end of the underlying sequence. All consecutive signals from es are ignored.

Subscriber interface

In order to start to monitor a stream, a user should subscribe to the stream using one of the subscription functions: watch, observe, subscribe.

The subscription can be canceled by using an unsubscribe function, or by throwing an exception from the callback function. The latter plays well with `with_return` function.

val watch : 'a t -> (id -> 'a -> unit) -> unit

watch s f watches a stream s with a function f. A subscription identifier is passed to the function, so it can be used to unsubscribe from the stream directly from the function.

val observe : 'a t -> ('a -> unit) -> unit

observe s f is like watch but a subscription identifier is not passed to the function f.

val subscribe : 'a t -> ('a -> unit) -> id

subscribe s f subscribe to a stream s with a function f. A subscription identifier is returned.

val unsubscribe : 'a t -> id -> unit

unsubscribe s id stop calling a function that was has a provided identifier id

val wait : 'a t -> unit

wait xs a polite way to notify a producer to slow down. Note: producer is not required to obey.

Publisher interface

val has_subscribers : 'a t -> bool

has_subscribers s is true if someone is watching for the stream

val on_subscribe : 'a t -> (id -> unit) -> unit

on_subscribe s f will call a function f every time someone is subscribed to a stream s

val on_unsubscribe : 'a t -> (id -> unit) -> unit

on_unsubscribe s f will call a function f every time someone has canceled subscription to a stream s

val on_wait : 'a t -> (unit -> unit) -> unit

on_wait s f will be called every time someone, watching a stream s, will call wait s to ask a producer to slow down.

Combinators

val map' : 'a t -> f:('a -> 'b Core_kernel.Queue.t) -> 'b t

s' = map' s ~f apply function f for each value of a stream s and push values from a resulting queue into the stream s'. Example:

let q,p = of_list ['a','b','c', '.']
let q' = map q ~f:(function
    | 'a'..'z' as c ->
      Queue.of_list Char.[uppercase c; lowercase c]
    | c -> Queue.singleton c

Will produce:

<A; a; B; b; C; c; .>

val map : 'a t -> f:('a -> 'b) -> 'b t

map ss ~f returns new stream, that is build by application of a function f to each element of the stream ss

val filter_map : 'a t -> f:('a -> 'b option) -> 'b t

filter_map s ~f for each value x in stream s, produce y if f x is Some y, otherwise ignore x

val filter : 'a t -> f:('a -> bool) -> 'a t

filter s f produce a stream that contains the elements of stream s, for which f evaluates to true.

val either : 'a t -> 'b t -> ('a, 'b) Core_kernel.Either.t t

either xs ys is a discriminated union of two streams.

val merge : 'a t -> 'b t -> f:('a -> 'b -> 'c) -> 'c t

merge xs ys f merges streams xs and ys using function f. To merge an arbitrary number of streams use the Variadic interface.

val apply : ('a -> 'b) t -> 'a t -> 'b t

apply fs xs apply stream of functions fs to a stream of values xs, producing a stream of results.

val concat : 'a t list -> 'a t

concat ss returns a stream that will produce elements from the input list of streams ss. The ordering of the elements of different streams is unspecified, though it is guaranteed that elements of the same stream will preserve their ordering.

val concat_merge : 'a t list -> f:('a -> 'a -> 'a) -> 'a t

concat_merge xs ~f builds a stream, that will produce elements from the input list and applies f to all consecutive elements. The ordering of the input list does not mandate the ordering of elements in the output stream, and is undefined. See concat for more information.

val split : 'a t -> f:('a -> 'b * 'c) -> 'b t * 'c t

split xs ~f returns a pair of streams, where the first stream contains fst (f x) for each x in xs and the second stream contains snd (f x) for each x in xs.

val zip : 'a t -> 'b t -> ('a * 'b) t

zip xs ys creates a steam that will produce an element (x,y) every time both xs and ys produce elements x and y respectively

val unzip : ('a * 'b) t -> 'a t * 'b t

unzip xs creates a pair of streams, where the first stream contains fst x for each x in xs and the second stream contains snd x for each x in xs. Essentially, the same as split ~f:Fn.id

val once : 'a t -> 'a t

once xs creates a stream that will at most contain the next value produced by xs and nothing more.

val parse : 'a t -> init:'b -> f:('b -> 'a -> 'c option * 'b) -> 'c t

parse ss ~init ~f parses stream ss and builds new stream ss'. Function f is applied to each consecutive element of the stream ss with a state s. If function f returns None,s', then no value is produced in the output state and state s' is passed to the next invocation of function f. If it returns Some x, s', then value x is produced by the output stream and state s' is passed to a consecutive invocation of f. If it state type 'b is an instance of a list type, then parse will be a push down automaton. With arbitrary type of state it is possible to build automatons that falls between PDA and Turing Machine (not including the latter).

val foldw : ?stride:int -> 'a t -> int -> init:'b -> f:('b -> 'a -> 'b) -> 'b t

foldw ss n ~init ~f performs a windowed fold of the stream. A function f is folded over n consecutive elements of ss, then the result is produced into the output stream, the window is shifted by stride (defaults to one) and function f applied to the next n elements. For example, if stream ss produced the following sequence of elements:

1,2,3,4,5,6,7,8

and windows length n is equal to 3, then the function f will be applied to a sequences:

[1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6,7], [6,7,8]

.

Example, a moving average filter implemented with foldw:

let moving_average ss n =
  Float.(foldw ss n ~init:zero ~f:(+) >>| fun s / of_int n)
val frame : clk:unit t -> 'a t -> init:'b -> f:('b -> 'a -> 'b) -> 'b t

frame ~clk s ~init ~f will gather elements of s into frames, where the start of the new frame is signaled by a stream clk. The function is very similar to foldw except, that the window is determined dynamically by a clk stream. This function is useful to build custom time scales.

The semantics of the function can be clarified with the following description: 1. Every time a stream s produces a value it is buffered 2. Every time a stream clk produces a value, a function f is folded over all buffered value, and the result is put into the output stream. The internal buffer is cleared afterwards.

Example -------

Consider the following timing diagram, where each row represents a stream, and columns represent time. Elements of the clk stream are depicted with a T symbol.

          clk:    T         T        T  T      T     T
           ss: 123 56 123 12  1234 4      1234  1

will be framed in the following way:

[123], [5612312], [12344], [], [1234], [1]

Note: since all streams should be serialized it is impossible, that two events occur at the same time. So at the same column of the timing diagram there can be only one event.

val sample : clk:unit t -> 'a t -> 'a option t

sample ~clk ss is semantically the same as frame ~clk ss >>| fst

val hd : 'a t -> 'a future

hd s returns a future that will occur as soon, as stream s will produce a value. Note: if hd is called on a stream, that already produced some values, the returned future will still be fulfilled on the first value, that will be put into the stream after the future is created.

val tl : 'a t -> 'a t

tl s ignores the next occurrence in the stream s

val find : 'a t -> f:('a -> bool) -> 'a future

find xs f returns a future that will be fulfilled with a first value for which a function f is true.

val find_map : 'a t -> f:('a -> 'b option) -> 'b future

find_map xs f returns a future that will be fulfilled by a result of a first invocation of f to an element of the stream, that evaluated to Some value

val take : 'a t -> int -> 'a list future

take xs n returns a future that will evaluate to n values of the stream xs that has occurred after the future was created.

val nth : 'a t -> int -> 'a future

nth xs n returns n'th element of the stream xs. The element is n'th with respect to the future f, if was n'th element of the stream after the creation of the stream.

val upon : unit future -> 'a t -> 'a future

upon e xs returns a future that will be fulfilled with a last value of a stream xs before an event e has occurred. If at the time when the event e occurs, the stream xs didn't produce any elements, then the future will not be fulfilled.

val before : unit future -> 'a t -> 'a list future

before e xs returns a list that contains elements of the stream xs that occurred before the event e

val last_before : unit future -> 'a t -> int -> 'a list future

last_before e xs n returns a list of length up to n, that contains last elements of the stream xs that occurred before the event e