Kasper Peulen
2016-01-07 13:41:14 UTC
I'm trying to implement a transformer like this one for Dart:
http://reactivex.io/documentation/operators/retry.html
So assume, I have this cute little stream of length 3:
Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';
await new Future.delayed(new Duration(seconds: 1));
yield 'second time';
await new Future.delayed(new Duration(seconds: 1));
if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}
await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}
Well theoretical length 3, there are some problems. There is a high chance
an exception is raised just before the stream closes. So therefore, we
write some retry logic:
Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}
So this works perfectly fine. But now this works perfectly fine, with this
as main method:
main() {
retry(myStream, 10).listen(print);
}
But now I want to write a transformer:
main() {
myStream().transform(new Retry(10)).listen(print);
}
class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);
Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);
Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}
Now this gives the following error:
first time
second time
Unhandled exception:
Bad state: Stream has already been listened to.
Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to a
stream. Kind of creating a new fresh instance of the stream.
Is that possible? Or as the Rx manual says it:
The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.
Any ideas?
http://reactivex.io/documentation/operators/retry.html
So assume, I have this cute little stream of length 3:
Stream myStream() async* {
await new Future.delayed(new Duration(seconds: 1));
yield 'first time';
await new Future.delayed(new Duration(seconds: 1));
yield 'second time';
await new Future.delayed(new Duration(seconds: 1));
if (new Random().nextInt(5) > 0) {
throw new Exception('Oh noes!!');
}
await new Future.delayed(new Duration(seconds: 1));
yield 'third time';
}
Well theoretical length 3, there are some problems. There is a high chance
an exception is raised just before the stream closes. So therefore, we
write some retry logic:
Stream retry(Stream stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw error;
}
}
}
So this works perfectly fine. But now this works perfectly fine, with this
as main method:
main() {
retry(myStream, 10).listen(print);
}
But now I want to write a transformer:
main() {
myStream().transform(new Retry(10)).listen(print);
}
class Retry<T> implements StreamTransformer<T, T> {
final maxTries;
Retry(this.maxTries);
Stream<T> bind(Stream<T> stream) => retry(() => stream, maxTries);
Stream<T> retry(Stream<T> stream(), int maxTries) async* {
try {
await for (dynamic e in stream()) {
yield e;
}
} catch (error) {
if (maxTries > 0) {
yield* retry(stream, maxTries - 1);
} else {
throw(error);
}
}
}
}
Now this gives the following error:
first time
second time
Unhandled exception:
Bad state: Stream has already been listened to.
Now I kind of understand why this doesn't work, the stream is already
completed, and I try to use it again.
So basically what I want/need it seems, is being able to resubscribe to a
stream. Kind of creating a new fresh instance of the stream.
Is that possible? Or as the Rx manual says it:
The Retry operator responds to an onError notification from the source
Observable by not passing that call through to its observers, but instead
by resubscribing to the source Observable and giving it another opportunity
to complete its sequence without error.
Any ideas?
--
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.
For other discussions, see https://groups.google.com/a/dartlang.org/
For HOWTO questions, visit http://stackoverflow.com/tags/dart
To file a bug report or feature request, go to http://www.dartbug.com/new
---
You received this message because you are subscribed to the Google Groups "Dart Misc" group.
To unsubscribe from this group and stop receiving emails from it, send an email to misc+***@dartlang.org.