With the recent release of the Reactive Extensions for .NET (Rx) on DevLabs, you’ll hear quite a bit about reactive programming, based on the IObservable<T> and IObserver<T> interfaces. A great amount of resources is available on Channel 9. In this series, I’ll focus on the dual of the System.Reactive assembly, which is System.Interactive, providing a bunch of extensions to the LINQ Standard Query Operators for IEnumerable<T>. In today’s installment we’ll talk about new combinator operators provided by EnumerableEx:

# Combine and conquer?

Combinators are at the heart of LINQ’s expressive power, allowing sequences to be combined into new ones. In earlier posts, I’ve shown the essence of monadic computation through the following illustration:

It’s fair to say that SelectMany (or Bind) is the mother of all combinators, as many others can be derived from it (Exercise: implement Where and Select using SelectMany and a limited number of auxiliary operators like Return). In today’s post we’ll look at various new combinators added to the IEnumerable<T> set of operators.

So, what’s a combinator? In one world view (the one we’re using), it’s an operator that combines one or more instances of a given entity into a new such entity. For example, in functional programming we got S, K and I combinators that act on functions:

S x y z = (x z) y z
K x y = x
I x = x

A more precise definition can be found on http://en.wikipedia.org/wiki/Combinator, for those interested in more foundational stuff. In our case, we’ll combine one or more IEnumerable<T> instances into a new IEnumerable<R> (where R can be different from T).

# Concat, now with more arguments

LINQ to Objects has always had a Concat operator, with the following signature:

public static IEnumerable<TSource> Concat<TSource>(this IEnumerable<TSource> first, IEnumerable<TSource> second);

However, this is merely a special case of a more general version of Concat, introduced in EnumerableEx:

public static IEnumerable<TSource> Concat<TSource>(params IEnumerable<TSource>[] sources);
public static IEnumerable<TSource> Concat<TSource>(this IEnumerable<IEnumerable<TSource>> sources);

The second one is the core operator we’re talking about here, with the first overload providing convenience due to the lack of a “params enumerable” feature in the language. The Concat operator is simple to understand, simply yielding all TSource objects from all sequences in the sources parameter. If an error occurs during enumeration any of the sequences, the resulting concatenated sequence is also terminated for yielding. In fact, this operator is very similar to OnErrorResumeNext where the error condition is ignored.

Below is a sample illustrating the main scenarios:

new[] {
new[] { 1, 2 },
new[] { 3, 4 },
new[] { 5, 6 }
}
.Concat()
.Materialize(/* for pretty printing */)
.Run(Console.WriteLine);

new[] {
new[] { 1, 2 },
new[] { 3, 4 }.Concat(EnumerableEx.Throw<int>(new Exception())),
new[] { 5, 6 }
}
.Concat()
.Materialize(/* for pretty printing */)
.Run(Console.WriteLine);

The first sample will print numbers 1 through 6, while the second one will print 1 through 4 and an error notification.

# Merge, a parallel Concat

Where Concat will proceed through the sources collection sequentially, guaranteeing in-order retrieval of data, one could get all the data from the sources in a parallel manner as well. To do so, Merge spawns workers that drain all of the sources in parallel, flattening or “sinking” all the results to the caller:

public static IEnumerable<TSource> Merge<TSource>(params IEnumerable<TSource>[] sources);
public static IEnumerable<TSource> Merge<TSource>(this IEnumerable<IEnumerable<TSource>> sources);
public static IEnumerable<TSource> Merge<TSource>(this IEnumerable<TSource> leftSource, IEnumerable<TSource> rightSource);

The three overloads share the same signatures as the Concat equivalents, with the second one being the most general overload again. The same sample as for Concat can be used to illustrate the working:

new[] {
new[] { 1, 2 },
new[] { 3, 4 },
new[] { 5, 6 }
}
.Merge()
.Materialize(/* for pretty printing */)
.Run(Console.WriteLine);

new[] {
new[] { 1, 2 },
new[] { 3, 4 }.Concat(EnumerableEx.Throw<int>(new Exception())),
new[] { 5, 6 }
}
.Merge()
.Materialize(/* for pretty printing */)
.Run(Console.WriteLine);

What the results are will depend on the mood of your task scheduler. Either way, for the first sample you should get to see all of the numbers from 1 through 6 getting printed, in any order (though 1 will come before 2, 3 before 4 and 5 before 6). On my machine I got 1, 3, 5, 4, 2, 6 in my first run. For the second sample, it’s entirely possible to see 5 and 6 getting printed before the exception for the second source is reached. But then that’s what you expect from parallel computation, don’t you?

Merge can speed up your data retrieval operations significantly, if you don’t care about the order in which results are returned. For example, you could cause two LINQ to SQL queries that provide stock quotes to run in parallel by using Merge, followed by a client-side duplicate entry elimination technique:

var stocks =
from quote in
EnumerableEx.Merge(
(from quote in t1 select quote).Do(q => Console.WriteLine("t1: " + q)),
(from quote in t2 select quote).Do(q => Console.WriteLine("t2: " + q))
)
group quote by quote.Symbol into g
select new { g.Key, Price = g.Average(p => p.Price) };

stocks.Run(Console.WriteLine);

Results could look as follows, with the main idea being the parallel retrieval of both query results:

Query: SELECT Symbol, Price FROM Trader1
Query: SELECT Symbol, Price FROM Trader2
t2: { Symbol = MSFT, Price = 30.94 }
t1: { Symbol = MSFT, Price = 30.99 }
t1: { Symbol = ORCL, Price = 24.92 }
t1: { Symbol = GOOG, Price = 618.35 }
t1: { Symbol = AAPL, Price = 209.10 }
t2: { Symbol = ORCL, Price = 25.06 }
t2: { Symbol = GOOG, Price = 610.25 }
t2: { Symbol = AAPL, Price = 204.99 }
{ Key = MSFT, Price = 30.965 }
{ Key = ORCL, Price = 24.99 }
{ Key = GOOG, Price = 614.30 }
{ Key = AAPL, Price = 207.045 }

(Note: behavior in face of an exception will depend on timing and is not included in the diagram.)

# Amb, a racing game

Amb is the ambiguous operator as introduced by McCarthy in 1963. Because of its nostalgic background, it’s been chosen to preserve the name as-is instead of expanding it. What’s so ambiguous about this operator? Well, the idea is that Amb allows two sequences to race to provide the first result causing the winning sequence to be elected as the one providing the resulting sequence from the operator call. The operator’s signatures make this clear:

public static IEnumerable<TSource> Amb<TSource>(params IEnumerable<TSource>[] sources);
public static IEnumerable<TSource> Amb<TSource>(this IEnumerable<IEnumerable<TSource>> sources);
public static IEnumerable<TSource> Amb<TSource>(this IEnumerable<TSource> leftSource, IEnumerable<TSource> rightSource);

Again, the overloads are threesome, just like Concat and Merge. To provide a sample of the operator’s behavior, use the following simple implementation of a Delay operator:

public static IEnumerable<TSource> Delay<TSource>(this IEnumerable<TSource> source, int delay)
{
return EnumerableEx.Defer(() => { Thread.Sleep(delay); return source; });
}

Now we can write the following two test cases:

var src1 = new[] { 1, 2 }.Delay(300);
var src2 = new[] { 3, 4 }.Delay(400);
src1.Amb(src2).Run(Console.WriteLine);

var src3 = new[] { 5, 6 }.Delay(400);
var src4 = new[] { 7, 8 }.Delay(300);
src3.Amb(src4).Run(Console.WriteLine);

The expected result will be that src1 and src4 win their Amb battles against src2 and src3, respectively. One practical use for this operator is to have two or more redundant data sources, all containing the same data, fight to provide the quickest answer to a query. Here’s a sample illustrating this:

var stocks =
EnumerableEx.Amb(
(from quote in t1 select quote).Do(q => Console.WriteLine("t1: " + q)),
(from quote in t2 select quote).Do(q => Console.WriteLine("t2: " + q))
);

stocks.Run(Console.WriteLine);

Results could look as follows, assuming t2 was the quickest to provide an answer:

Query: SELECT Symbol, Price FROM Trader1
Query: SELECT Symbol, Price FROM Trader2
t2: { Symbol = MSFT, Price = 30.94 }
t2: { Symbol = ORCL, Price = 25.06 }
t2: { Symbol = GOOG, Price = 610.25 }
t2: { Symbol = AAPL, Price = 204.99 }
{ Key = MSFT, Price = 30.94 }
{ Key = ORCL, Price = 25.06 }
{ Key = GOOG, Price = 610.25 }
{ Key = AAPL, Price = 204.99 }

# Repeat, again and (maybe) again

The purpose of Repeat is self-explanatory and could be seen as a constructor function as well. Two categories of overloads exists: one that takes a single element and an optional repeat count (unspecified = infinite) and another that takes a sequence and an optional repeat count. While the former is more of a constructor, the latter is more of a combinator over a single input sequence:

public static IEnumerable<TSource> Repeat<TSource>(this IEnumerable<TSource> source);
public static IEnumerable<TSource> Repeat<TSource>(TSource value);
public static IEnumerable<TSource> Repeat<TSource>(this IEnumerable<TSource> source, int repeatCount);
public static IEnumerable<TSource> Repeat<TSource>(TSource value, int repeatCount);

Samples don’t need much further explanation either:

EnumerableEx.Repeat(1).Take(5).Run(Console.WriteLine);
EnumerableEx.Repeat(2, 5).Run(Console.WriteLine);

new[] { 3, 4 }.Repeat().Take(4).Run(Console.WriteLine);
new[] { 5, 6 }.Repeat(2).Run(Console.WriteLine);

It goes almost without saying that an input sequence causing an exception will also terminate the enumeration of a repeated form of the same sequence:

new[] { 5, 6 }.Concat(EnumerableEx.Throw<int>(new Exception())).Repeat(2).Run(Console.WriteLine);

# Zip ‘em together

Introduced in .NET 4.0, I’ve covered the new Zip operator already in my earlier post on C# 4.0 Feature Focus - Part 3 - Intermezzo: LINQ's new Zip operator. Rx ports back this operator to the .NET 3.5 System.Interactive library for consistency. In summary, Zip walks two sequences hand-in-hand, combing their respective yielded elements using a given function to produce a result. The signature is as follows:

public static IEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IEnumerable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector);

A simple example is shown below:

Enumerable.Range(1, 26).Zip(
"abcdefghijklmnopqrstuvwxyz",
(i, c) => "alpha[" + i + "] = " + c
).Run(Console.WriteLine);

In here, the first sequence is an IEnumerable<int> and the second one is a string, hence an IEnumerable<char>. The result is a table of mappings between numbers and letters. As an exercise, implement the following overload of Select using Zip and Generate, in terms of the more commonly used overload of Select that doesn’t take a position in the selector function:

public static IEnumerable<TResult> Select<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, int, TResult> selector);

One thing that’s interesting about the interactive version of Zip is its left-to-right characteristic with regards to enumeration of first and second. Internally, it does something along the following lines:

while (first.MoveNext() && second.MoveNext())
…

In other words, “first” is dominant in that it can prevent a MoveNext call on second from happening, e.g. because of an exception getting thrown, non-termination (stuck forever) and termination (returning false). The following matrix shows the implications of this:

It’s left as an exercise to the reader to implement the right-hand side behavior (notice the transposition symmetry!) for fun, where a Zip could fetch results from both sources simultaneously, combining their results or exceptions into produced results. What are advantages and disadvantages of such an approach? As an additional question, think about ways to detect and report an asymmetric zip, where one of both sides still has an element while the other side has signaled termination.

Finally, the diagram illustrating some of the regular operations of Zip. Other combinations of behavior can be read from the matrix above.

# Scan, a running aggregation operator

Readers familiar with the LINQ to Objects APIs will know about the Aggregate operator, which we also mentioned before when talking about the new Generate operator (as the opposite of Aggregate). Aggregate “folds” or reduces a sequence of elements into a single value, eating the elements one by one using some specified function. However, sometimes you may not want to loose the intermediate results, e.g. if you want to compute a running sum or so. Scan allows you to do so:

public static IEnumerable<TSource> Scan<TSource>(this IEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator);
public static IEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator);

You’ll see big similarities with the existing Aggregate operator when looking at the signatures above, and use of the operator is straightforward as well:

Enumerable.Range(1, 10)
.Scan((sum, i) => sum + i)
.Run(Console.WriteLine);

Enumerable.Range(2, 9).Reverse()
.Scan(3628800, (prod, i) => prod / i)
.Run(Console.WriteLine);

The first sample will simply print 1, 1+2 = 3, 3+3 = 6, 6+4 = 10, … In the second sample, a seed value is used to illustrate an inverse factorial computation, dividing a given value by subsequent descending values (from 10 to 2).

# SelectMany

Finally, as a honor to the monadic bind operator, a new overload was added for SelectMany :-). Its signature is shown below, and it’s left to the reader to figure out what it does (simple):

public static IEnumerable<TOther> SelectMany<TSource, TOther>(this IEnumerable<TSource> source, IEnumerable<TOther> other);

# Next on More LINQ

Functionally inspired constructs allowing to share enumerables and tame their side-effects.

Del.icio.us | Digg It | Technorati | Blinklist | Furl | reddit | DotNetKicks

Filed under: ,

#### #The Morning Brew - Chris Alcock &raquo; The Morning Brew #507

Wednesday, December 30, 2009 3:15 AM by The Morning Brew - Chris Alcock » The Morning Brew #507

Pingback from  The Morning Brew - Chris Alcock  &raquo; The Morning Brew #507

#### #Dew Drop &#8211; December 30, 2009 | Alvin Ashcraft&#039;s Morning Dew

Wednesday, December 30, 2009 5:30 AM by Dew Drop – December 30, 2009 | Alvin Ashcraft's Morning Dew

Pingback from  Dew Drop &#8211; December 30, 2009 | Alvin Ashcraft&#039;s Morning Dew

#### #Twitter Trackbacks for More LINQ with System.Interactive ??? More combinators for your Swiss Army Knife - B# .NET Blog [bartdesmet.net] on Topsy.com

Pingback from  Twitter Trackbacks for                 More LINQ with System.Interactive ??? More combinators for your Swiss Army Knife - B# .NET Blog         [bartdesmet.net]        on Topsy.com

#### #Еще большое о LINQ с System.Interactive

Wednesday, December 30, 2009 10:09 AM by progg.ru

Thank you for submitting this cool story - Trackback from progg.ru

#### #re: More LINQ with System.Interactive – More combinators for your Swiss Army Knife

Monday, January 04, 2010 9:29 PM by Dennis

Thank you Bart. Looking forward to "Functional" operators.

#### #Reactive Extensions for .NET (Rx) &laquo; Just Justin&#039;s

Saturday, February 06, 2010 3:45 AM by Reactive Extensions for .NET (Rx) « Just Justin's

Pingback from  Reactive Extensions for .NET (Rx)  &laquo; Just Justin&#039;s

#### #Reactive Extensions for .NET ( “stuff happens” )

Wednesday, August 18, 2010 7:18 AM by Mike Taulty's Blog

I’ve been taking a look at the Reactive Extensions for .NET. It’s early days for me at this point but

#### #Merge sequences in parallel

Monday, June 13, 2011 2:45 PM by Chasing state of the art

In practice you may find yourself in a situation when you have several sequences of data that you want to drain in parallel and merge the results into a single sequence. This is what Merge combinator for sequences does. Reactive Extensions had it for

#### #Right-hand side Enumerable.Zip

Monday, June 13, 2011 2:46 PM by Chasing state of the art

With Reactive Extensions for .NET (Rx) and .NET Framework 4 a new LINQ operator was introduced – Zip ( Bart De Smet gives excellent explanation about the idea and implementation details behind Zip operator ). In a nutshell it merges two sequences into