Zac Medico
2018-07-05 05:24:26 UTC
For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.
Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).
Bug: https://bugs.gentoo.org/660426
---
.../tests/util/futures/test_compat_coroutine.py | 57 ++++++++++++++
pym/portage/util/futures/compat_coroutine.py | 90 ++++++++++++++++++++++
2 files changed, 147 insertions(+)
create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
create mode 100644 pym/portage/util/futures/compat_coroutine.py
diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..4a1d931b6b
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,57 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+ coroutine,
+ coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+ def test_returning_coroutine(self):
+ @coroutine
+ def returning_coroutine():
+ coroutine_return('success')
+ yield None
+
+ self.assertEqual('success',
+ asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+ def test_raising_coroutine(self):
+
+ class TestException(Exception):
+ pass
+
+ @coroutine
+ def raising_coroutine():
+ raise TestException('exception')
+ yield None
+
+ self.assertRaises(TestException,
+ asyncio.get_event_loop().run_until_complete, raising_coroutine())
+
+ def test_cancelled_coroutine(self):
+
+ @coroutine
+ def endlessly_sleeping_coroutine(loop=None):
+ loop = asyncio._wrap_loop(loop)
+ yield loop.create_future()
+
+ loop = asyncio.get_event_loop()
+ future = endlessly_sleeping_coroutine(loop=loop)
+ loop.call_soon(future.cancel)
+
+ self.assertRaises(asyncio.CancelledError,
+ loop.run_until_complete, future)
+
+ def test_sleeping_coroutine(self):
+ @coroutine
+ def sleeping_coroutine():
+ for i in range(3):
+ x = yield asyncio.sleep(0, result=i)
+ self.assertEqual(x, i)
+
+ asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..eea0b2883e
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+ """
+ A decorator for a generator function that behaves as coroutine function.
+ The generator should yield a Future instance in order to wait for it,
+ and the result becomes the result of the current yield-expression,
+ via the PEP 342 generator send() method.
+
+ The decorated function returns a Future which is done when the generator
+ is exhausted. The generator can return a value via the coroutine_return
+ function.
+ """
+ return functools.partial(_generator_future, generator_func)
+
+
+def coroutine_return(result=None):
+ """
+ Return a result from the current coroutine.
+ """
+ raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+ """
+ Call generator_func with the given arguments, and return a Future
+ that is done when the resulting generation is exhausted. If is a
+ keyword argument named 'loop' is given, then it is used instead of
+ the default event loop.
+ """
+ loop = asyncio._wrap_loop(kwargs.get('loop'))
+ result = loop.create_future()
+ _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+ return result
+
+
+class _CoroutineReturnValue(Exception):
+ def __init__(self, result):
+ self.result = result
+
+
+class _GeneratorTask(object):
+ """
+ Asynchronously executes the generator to completion, waiting for
+ the result of each Future that it yields, and sending the result
+ to the generator.
+ """
+ def __init__(self, generator, result, loop):
+ self._generator = generator
+ self._result = result
+ self._loop = loop
+ result.add_done_callback(self._cancel_callback)
+ self._next()
+
+ def _cancel_callback(self, result):
+ if result.cancelled():
+ self._generator.close()
+
+ def _next(self, previous=None):
+ if self._result.cancelled():
+ return
+ try:
+ if previous is None:
+ future = next(self._generator)
+ elif previous.cancelled():
+ self._generator.throw(asyncio.CancelledError())
+ future = next(self._generator)
+ elif previous.exception() is None:
+ future = self._generator.send(previous.result())
+ else:
+ self._generator.throw(previous.exception())
+ future = next(self._generator)
+
+ except _CoroutineReturnValue as e:
+ if not self._result.cancelled():
+ self._result.set_result(e.result)
+ except StopIteration:
+ if not self._result.cancelled():
+ self._result.set_result(None)
+ except Exception as e:
+ if not self._result.cancelled():
+ self._result.set_exception(e)
+ else:
+ future = asyncio.ensure_future(future, loop=self._loop)
+ future.add_done_callback(self._next)
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.
Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).
Bug: https://bugs.gentoo.org/660426
---
.../tests/util/futures/test_compat_coroutine.py | 57 ++++++++++++++
pym/portage/util/futures/compat_coroutine.py | 90 ++++++++++++++++++++++
2 files changed, 147 insertions(+)
create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
create mode 100644 pym/portage/util/futures/compat_coroutine.py
diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..4a1d931b6b
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,57 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+ coroutine,
+ coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+ def test_returning_coroutine(self):
+ @coroutine
+ def returning_coroutine():
+ coroutine_return('success')
+ yield None
+
+ self.assertEqual('success',
+ asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+ def test_raising_coroutine(self):
+
+ class TestException(Exception):
+ pass
+
+ @coroutine
+ def raising_coroutine():
+ raise TestException('exception')
+ yield None
+
+ self.assertRaises(TestException,
+ asyncio.get_event_loop().run_until_complete, raising_coroutine())
+
+ def test_cancelled_coroutine(self):
+
+ @coroutine
+ def endlessly_sleeping_coroutine(loop=None):
+ loop = asyncio._wrap_loop(loop)
+ yield loop.create_future()
+
+ loop = asyncio.get_event_loop()
+ future = endlessly_sleeping_coroutine(loop=loop)
+ loop.call_soon(future.cancel)
+
+ self.assertRaises(asyncio.CancelledError,
+ loop.run_until_complete, future)
+
+ def test_sleeping_coroutine(self):
+ @coroutine
+ def sleeping_coroutine():
+ for i in range(3):
+ x = yield asyncio.sleep(0, result=i)
+ self.assertEqual(x, i)
+
+ asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..eea0b2883e
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+ """
+ A decorator for a generator function that behaves as coroutine function.
+ The generator should yield a Future instance in order to wait for it,
+ and the result becomes the result of the current yield-expression,
+ via the PEP 342 generator send() method.
+
+ The decorated function returns a Future which is done when the generator
+ is exhausted. The generator can return a value via the coroutine_return
+ function.
+ """
+ return functools.partial(_generator_future, generator_func)
+
+
+def coroutine_return(result=None):
+ """
+ Return a result from the current coroutine.
+ """
+ raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+ """
+ Call generator_func with the given arguments, and return a Future
+ that is done when the resulting generation is exhausted. If is a
+ keyword argument named 'loop' is given, then it is used instead of
+ the default event loop.
+ """
+ loop = asyncio._wrap_loop(kwargs.get('loop'))
+ result = loop.create_future()
+ _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+ return result
+
+
+class _CoroutineReturnValue(Exception):
+ def __init__(self, result):
+ self.result = result
+
+
+class _GeneratorTask(object):
+ """
+ Asynchronously executes the generator to completion, waiting for
+ the result of each Future that it yields, and sending the result
+ to the generator.
+ """
+ def __init__(self, generator, result, loop):
+ self._generator = generator
+ self._result = result
+ self._loop = loop
+ result.add_done_callback(self._cancel_callback)
+ self._next()
+
+ def _cancel_callback(self, result):
+ if result.cancelled():
+ self._generator.close()
+
+ def _next(self, previous=None):
+ if self._result.cancelled():
+ return
+ try:
+ if previous is None:
+ future = next(self._generator)
+ elif previous.cancelled():
+ self._generator.throw(asyncio.CancelledError())
+ future = next(self._generator)
+ elif previous.exception() is None:
+ future = self._generator.send(previous.result())
+ else:
+ self._generator.throw(previous.exception())
+ future = next(self._generator)
+
+ except _CoroutineReturnValue as e:
+ if not self._result.cancelled():
+ self._result.set_result(e.result)
+ except StopIteration:
+ if not self._result.cancelled():
+ self._result.set_result(None)
+ except Exception as e:
+ if not self._result.cancelled():
+ self._result.set_exception(e)
+ else:
+ future = asyncio.ensure_future(future, loop=self._loop)
+ future.add_done_callback(self._next)
--
2.13.6
2.13.6