c# - Stopping all subsequent calls to subscribe in reactive extensions -

i replacing of written code reactive extensions.

previously using file.readalllines(filename) , loop through lines of files , in loop @ point, break loop , perform operation on processed records(which want in oncompleted action of subscribe method).

now trying use this

var observablesequence = file.readalllines(filename).toobservable(); observablesequence.subscribe(u=> { //doing thing @ point  need stop receving further calls , go oncompleted action. }); 

i understand there cheap hack of if condition wont satisfied after condition still don't want waste cpu cycles on processing unnecessary events.

the way know stop doing disposing subscription @ point while in between subscribing wont possible.

so there way ignore events @ point , jump straight oncompleted.

since want sort of manual way stop observable, try this:

var signal = new subject<unit>(); var observablesequence =     file         .readalllines(filename)         .toobservable(scheduler.default)         .takeuntil(signal); var count = 0; observablesequence.subscribe(u=> {     if (++count == 100)      {         signal.onnext(unit.default);     } }, () => console.writeline(count));  console.writeline(count); 

it preferable part of query though. this:

var observablesequence =     file         .readalllines(filename)         .toobservable(scheduler.default)         .takewhile((x, n) => n < 10); var count = 0; observablesequence.subscribe(u => ++count, () => console.writeline(count));  console.writeline(count); 
