Discussion:
[dart-misc] Implementing retry transformer with streams
Kasper Peulen
2016-01-07 13:41:14 UTC
Permalink
I'm trying to implement a transformer like this one for Dart:
http://reactivex.io/documentation/operators/retry.html

So assume, I have this cute little stream of length 3:

Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';

await new Future.delayed(new Duration(seconds: 1));
yield 'second time';

await new Future.delayed(new Duration(seconds: 1));

if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}

await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}


Well theoretical length 3, there are some problems. There is a high chance
an exception is raised just before the stream closes. So therefore, we
write some retry logic:

Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}


So this works perfectly fine. But now this works perfectly fine, with this
as main method:

main() {
retry(myStream, 10).listen(print);
}


But now I want to write a transformer:

main() {
myStream().transform(new Retry(10)).listen(print);
}

class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);

Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);

Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}


Now this gives the following error:


first time
second time
Unhandled exception:
Bad state: Stream has already been listened to.


Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to a
stream. Kind of creating a new fresh instance of the stream.
Is that possible? Or as the Rx manual says it:

The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.

Any ideas?
--
For other discussions, see https://groups.google.com/a/dartlang.org/

For HOWTO questions, visit http://stackoverflow.com/tags/dart

To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.
'Lasse R.H. Nielsen' via Dart Misc
2016-01-07 13:54:30 UTC
Permalink
Dart streams can not be rerun [1].

What you need here is multiple independent streams trying to do the same
operation (just repeating the events won't solve the error problem).
So, you need to ensure that your function returning a stream will return a
*new* stream each time. That means you can't do that at the stream level,
you need to do it at the previous level where you have code to create the
stream.

/L


[1] I personally think that's a mistake, and each call to listen on a
non-broadcast stream should replay the stream from scratch *if possible*.
It's not always possible (or would require repeating a larger operation,
like resending an HTTP request to get the headers again). On the other
hand. reading a file should be repeatable, you just read it again. Computed
streams like Stream.fromIterable could easily be repeatable (and initially
was).
It's probably not something we can change now since all existing streams
will not work like that, and stream controllers would have to handle
individual onListen callbacks somehow.

/L
Post by Kasper Peulen
http://reactivex.io/documentation/operators/retry.html
Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';
await new Future.delayed(new Duration(seconds: 1));
yield 'second time';
await new Future.delayed(new Duration(seconds: 1));
if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}
await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}
Well theoretical length 3, there are some problems. There is a high chance
an exception is raised just before the stream closes. So therefore, we
Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}
So this works perfectly fine. But now this works perfectly fine, with this
main() {
retry(myStream, 10).listen(print);
}
main() {
myStream().transform(new Retry(10)).listen(print);
}
class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);
Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);
Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}
first time
second time
Bad state: Stream has already been listened to.
Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to a
stream. Kind of creating a new fresh instance of the stream.
The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.
Any ideas?
--
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups
"Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an
--
Lasse R.H. Nielsen - ***@google.com
'Faith without judgement merely degrades the spirit divine'
Google Denmark ApS - Frederiksborggade 20B, 1 sal - 1360 KÞbenhavn K
- Denmark - CVR nr. 28 86 69 84
--
For other discussions, see https://groups.google.com/a/dartlang.org/

For HOWTO questions, visit http://stackoverflow.com/tags/dart

To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.
Kasper Peulen
2016-01-07 14:49:25 UTC
Permalink
Ok, I think I understand this, but practically, this means that I can not
implement a Reply transformer like I try to do?

Because I think the transformer, will only be able to touch the stream
itself, and not the function that generates a new stream.

I was basically trying to figure out how you could do things like these
that you can do with observables in Rxjs.

this.http.get('http://localhost:3001/api/random-quote') .retry(3) .map(res
=> res.text())
.filter(x => x.length > 100) .subscribe( data => this.randomQuote = data,
err => this.logError(err), () => console.log('Random Quote Complete') );



On Thursday, January 7, 2016 at 2:54:57 PM UTC+1, Lasse Reichstein Holst
Post by 'Lasse R.H. Nielsen' via Dart Misc
Dart streams can not be rerun [1].
What you need here is multiple independent streams trying to do the same
operation (just repeating the events won't solve the error problem).
So, you need to ensure that your function returning a stream will return a
*new* stream each time. That means you can't do that at the stream level,
you need to do it at the previous level where you have code to create the
stream.
/L
[1] I personally think that's a mistake, and each call to listen on a
non-broadcast stream should replay the stream from scratch *if possible*.
It's not always possible (or would require repeating a larger operation,
like resending an HTTP request to get the headers again). On the other
hand. reading a file should be repeatable, you just read it again. Computed
streams like Stream.fromIterable could easily be repeatable (and initially
was).
It's probably not something we can change now since all existing streams
will not work like that, and stream controllers would have to handle
individual onListen callbacks somehow.
/L
Post by Kasper Peulen
http://reactivex.io/documentation/operators/retry.html
Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';
await new Future.delayed(new Duration(seconds: 1));
yield 'second time';
await new Future.delayed(new Duration(seconds: 1));
if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}
await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}
Well theoretical length 3, there are some problems. There is a high
chance an exception is raised just before the stream closes. So therefore,
Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}
So this works perfectly fine. But now this works perfectly fine, with
main() {
retry(myStream, 10).listen(print);
}
main() {
myStream().transform(new Retry(10)).listen(print);
}
class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);
Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);
Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}
first time
second time
Bad state: Stream has already been listened to.
Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to a
stream. Kind of creating a new fresh instance of the stream.
The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.
Any ideas?
--
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups
"Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an
--
'Faith without judgement merely degrades the spirit divine'
Google Denmark ApS - Frederiksborggade 20B, 1 sal - 1360 KÞbenhavn K
- Denmark - CVR nr. 28 86 69 84
--
For other discussions, see https://groups.google.com/a/dartlang.org/

For HOWTO questions, visit http://stackoverflow.com/tags/dart

To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.
'Lasse R.H. Nielsen' via Dart Misc
2016-01-08 08:38:04 UTC
Permalink
Post by Kasper Peulen
Ok, I think I understand this, but practically, this means that I can not
implement a Reply transformer like I try to do?
Yes.
Post by Kasper Peulen
Because I think the transformer, will only be able to touch the stream
itself, and not the function that generates a new stream.
True. A StreamTransformer is like the map function on iterable. It can
change the individual events, but it only has the one stream to work with.
Post by Kasper Peulen
I was basically trying to figure out how you could do things like these
that you can do with observables in Rxjs.
this.http.get('http://localhost:3001/api/random-quote') .retry(3)
.map(res => res.text())
.filter(x => x.length > 100) .subscribe( data => this.randomQuote = data,
err => this.logError(err), () => console.log('Random Quote Complete') );
It is an interesting pattern. It really requires that the stream itself can
be replayed (so the http.get doesn't actually perform the get operation,
but waits until you do a subscribe at the end).
That's not how Dart streams work. I wish it was :)

/L
Post by Kasper Peulen
On Thursday, January 7, 2016 at 2:54:57 PM UTC+1, Lasse Reichstein Holst
Post by 'Lasse R.H. Nielsen' via Dart Misc
Dart streams can not be rerun [1].
What you need here is multiple independent streams trying to do the same
operation (just repeating the events won't solve the error problem).
So, you need to ensure that your function returning a stream will return
a *new* stream each time. That means you can't do that at the stream level,
you need to do it at the previous level where you have code to create the
stream.
/L
[1] I personally think that's a mistake, and each call to listen on a
non-broadcast stream should replay the stream from scratch *if possible*.
It's not always possible (or would require repeating a larger operation,
like resending an HTTP request to get the headers again). On the other
hand. reading a file should be repeatable, you just read it again. Computed
streams like Stream.fromIterable could easily be repeatable (and initially
was).
It's probably not something we can change now since all existing streams
will not work like that, and stream controllers would have to handle
individual onListen callbacks somehow.
/L
Post by Kasper Peulen
http://reactivex.io/documentation/operators/retry.html
Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';
await new Future.delayed(new Duration(seconds: 1));
yield 'second time';
await new Future.delayed(new Duration(seconds: 1));
if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}
await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}
Well theoretical length 3, there are some problems. There is a high
chance an exception is raised just before the stream closes. So therefore,
Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}
So this works perfectly fine. But now this works perfectly fine, with
main() {
retry(myStream, 10).listen(print);
}
main() {
myStream().transform(new Retry(10)).listen(print);
}
class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);
Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);
Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}
first time
second time
Bad state: Stream has already been listened to.
Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to
a stream. Kind of creating a new fresh instance of the stream.
The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.
Any ideas?
--
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to
http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google
Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send
--
'Faith without judgement merely degrades the spirit divine'
Google Denmark ApS - Frederiksborggade 20B, 1 sal - 1360 KÞbenhavn K
- Denmark - CVR nr. 28 86 69 84
--
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups
"Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an
--
Lasse R.H. Nielsen - ***@google.com
'Faith without judgement merely degrades the spirit divine'
Google Denmark ApS - Frederiksborggade 20B, 1 sal - 1360 KÞbenhavn K
- Denmark - CVR nr. 28 86 69 84
--
For other discussions, see https://groups.google.com/a/dartlang.org/

For HOWTO questions, visit http://stackoverflow.com/tags/dart

To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.
Loading...