Discussion:
[gentoo-portage-dev] [PATCH] Add python2 compatible coroutine support (bug 660426)
Zac Medico
2018-07-05 05:24:26 UTC
Permalink
For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.

Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).

Bug: https://bugs.gentoo.org/660426
---
.../tests/util/futures/test_compat_coroutine.py | 57 ++++++++++++++
pym/portage/util/futures/compat_coroutine.py | 90 ++++++++++++++++++++++
2 files changed, 147 insertions(+)
create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
create mode 100644 pym/portage/util/futures/compat_coroutine.py

diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..4a1d931b6b
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,57 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+ coroutine,
+ coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+ def test_returning_coroutine(self):
+ @coroutine
+ def returning_coroutine():
+ coroutine_return('success')
+ yield None
+
+ self.assertEqual('success',
+ asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+ def test_raising_coroutine(self):
+
+ class TestException(Exception):
+ pass
+
+ @coroutine
+ def raising_coroutine():
+ raise TestException('exception')
+ yield None
+
+ self.assertRaises(TestException,
+ asyncio.get_event_loop().run_until_complete, raising_coroutine())
+
+ def test_cancelled_coroutine(self):
+
+ @coroutine
+ def endlessly_sleeping_coroutine(loop=None):
+ loop = asyncio._wrap_loop(loop)
+ yield loop.create_future()
+
+ loop = asyncio.get_event_loop()
+ future = endlessly_sleeping_coroutine(loop=loop)
+ loop.call_soon(future.cancel)
+
+ self.assertRaises(asyncio.CancelledError,
+ loop.run_until_complete, future)
+
+ def test_sleeping_coroutine(self):
+ @coroutine
+ def sleeping_coroutine():
+ for i in range(3):
+ x = yield asyncio.sleep(0, result=i)
+ self.assertEqual(x, i)
+
+ asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..eea0b2883e
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+ """
+ A decorator for a generator function that behaves as coroutine function.
+ The generator should yield a Future instance in order to wait for it,
+ and the result becomes the result of the current yield-expression,
+ via the PEP 342 generator send() method.
+
+ The decorated function returns a Future which is done when the generator
+ is exhausted. The generator can return a value via the coroutine_return
+ function.
+ """
+ return functools.partial(_generator_future, generator_func)
+
+
+def coroutine_return(result=None):
+ """
+ Return a result from the current coroutine.
+ """
+ raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+ """
+ Call generator_func with the given arguments, and return a Future
+ that is done when the resulting generation is exhausted. If is a
+ keyword argument named 'loop' is given, then it is used instead of
+ the default event loop.
+ """
+ loop = asyncio._wrap_loop(kwargs.get('loop'))
+ result = loop.create_future()
+ _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+ return result
+
+
+class _CoroutineReturnValue(Exception):
+ def __init__(self, result):
+ self.result = result
+
+
+class _GeneratorTask(object):
+ """
+ Asynchronously executes the generator to completion, waiting for
+ the result of each Future that it yields, and sending the result
+ to the generator.
+ """
+ def __init__(self, generator, result, loop):
+ self._generator = generator
+ self._result = result
+ self._loop = loop
+ result.add_done_callback(self._cancel_callback)
+ self._next()
+
+ def _cancel_callback(self, result):
+ if result.cancelled():
+ self._generator.close()
+
+ def _next(self, previous=None):
+ if self._result.cancelled():
+ return
+ try:
+ if previous is None:
+ future = next(self._generator)
+ elif previous.cancelled():
+ self._generator.throw(asyncio.CancelledError())
+ future = next(self._generator)
+ elif previous.exception() is None:
+ future = self._generator.send(previous.result())
+ else:
+ self._generator.throw(previous.exception())
+ future = next(self._generator)
+
+ except _CoroutineReturnValue as e:
+ if not self._result.cancelled():
+ self._result.set_result(e.result)
+ except StopIteration:
+ if not self._result.cancelled():
+ self._result.set_result(None)
+ except Exception as e:
+ if not self._result.cancelled():
+ self._result.set_exception(e)
+ else:
+ future = asyncio.ensure_future(future, loop=self._loop)
+ future.add_done_callback(self._next)
--
2.13.6
Zac Medico
2018-07-08 04:29:03 UTC
Permalink
For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.

Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).

Bug: https://bugs.gentoo.org/660426
---
[PATCH v2] fixed to support decoration of object methods, and added
a unit test using this support to demonstrate interaction between
multiple coroutines

.../tests/util/futures/test_compat_coroutine.py | 122 +++++++++++++++++++++
pym/portage/util/futures/compat_coroutine.py | 96 ++++++++++++++++
2 files changed, 218 insertions(+)
create mode 100644 pym/portage/tests/util/futures/test_compat_coroutine.py
create mode 100644 pym/portage/util/futures/compat_coroutine.py

diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 0000000000..f9de409ae4
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,122 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+ coroutine,
+ coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+ def test_returning_coroutine(self):
+ @coroutine
+ def returning_coroutine():
+ coroutine_return('success')
+ yield None
+
+ self.assertEqual('success',
+ asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+ def test_raising_coroutine(self):
+
+ class TestException(Exception):
+ pass
+
+ @coroutine
+ def raising_coroutine():
+ raise TestException('exception')
+ yield None
+
+ self.assertRaises(TestException,
+ asyncio.get_event_loop().run_until_complete, raising_coroutine())
+
+ def test_cancelled_coroutine(self):
+
+ @coroutine
+ def endlessly_sleeping_coroutine(loop=None):
+ loop = asyncio._wrap_loop(loop)
+ yield loop.create_future()
+
+ loop = asyncio.get_event_loop()
+ future = endlessly_sleeping_coroutine(loop=loop)
+ loop.call_soon(future.cancel)
+
+ self.assertRaises(asyncio.CancelledError,
+ loop.run_until_complete, future)
+
+ def test_sleeping_coroutine(self):
+ @coroutine
+ def sleeping_coroutine():
+ for i in range(3):
+ x = yield asyncio.sleep(0, result=i)
+ self.assertEqual(x, i)
+
+ asyncio.get_event_loop().run_until_complete(sleeping_coroutine())
+
+ def test_method_coroutine(self):
+
+ class Cubby(object):
+
+ _empty = object()
+
+ def __init__(self, loop):
+ self._loop = loop
+ self._value = self._empty
+ self._waiters = []
+
+ def _notify(self):
+ waiters = self._waiters
+ self._waiters = []
+ for waiter in waiters:
+ waiter.set_result(None)
+
+ def _wait(self):
+ waiter = self._loop.create_future()
+ self._waiters.append(waiter)
+ return waiter
+
+ @coroutine
+ def read(self):
+ while self._value is self._empty:
+ yield self._wait()
+
+ value = self._value
+ self._value = self._empty
+ self._notify()
+ coroutine_return(value)
+
+ @coroutine
+ def write(self, value):
+ while self._value is not self._empty:
+ yield self._wait()
+
+ self._value = value
+ self._notify()
+
+ @coroutine
+ def writer_coroutine(cubby, values, sentinel):
+ for value in values:
+ yield cubby.write(value)
+ yield cubby.write(sentinel)
+
+ @coroutine
+ def reader_coroutine(cubby, sentinel):
+ results = []
+ while True:
+ result = yield cubby.read()
+ if result == sentinel:
+ break
+ results.append(result)
+ coroutine_return(results)
+
+ loop = asyncio.get_event_loop()
+ cubby = Cubby(loop)
+ values = list(range(3))
+ writer = asyncio.ensure_future(writer_coroutine(cubby, values, None), loop=loop)
+ reader = asyncio.ensure_future(reader_coroutine(cubby, None), loop=loop)
+ loop.run_until_complete(asyncio.wait([writer, reader]))
+
+ self.assertEqual(reader.result(), values)
diff --git a/pym/portage/util/futures/compat_coroutine.py b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 0000000000..32909f4b4c
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,96 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+ """
+ A decorator for a generator function that behaves as coroutine function.
+ The generator should yield a Future instance in order to wait for it,
+ and the result becomes the result of the current yield-expression,
+ via the PEP 342 generator send() method.
+
+ The decorated function returns a Future which is done when the generator
+ is exhausted. The generator can return a value via the coroutine_return
+ function.
+ """
+ # Note that functools.partial does not work for decoration of
+ # methods, since it doesn't implement the descriptor protocol.
+ # This problem is solve by defining a wrapper function.
+ @functools.wraps(generator_func)
+ def wrapped(*args, **kwargs):
+ return _generator_future(generator_func, *args, **kwargs)
+ return wrapped
+
+
+def coroutine_return(result=None):
+ """
+ Return a result from the current coroutine.
+ """
+ raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+ """
+ Call generator_func with the given arguments, and return a Future
+ that is done when the resulting generation is exhausted. If is a
+ keyword argument named 'loop' is given, then it is used instead of
+ the default event loop.
+ """
+ loop = asyncio._wrap_loop(kwargs.get('loop'))
+ result = loop.create_future()
+ _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+ return result
+
+
+class _CoroutineReturnValue(Exception):
+ def __init__(self, result):
+ self.result = result
+
+
+class _GeneratorTask(object):
+ """
+ Asynchronously executes the generator to completion, waiting for
+ the result of each Future that it yields, and sending the result
+ to the generator.
+ """
+ def __init__(self, generator, result, loop):
+ self._generator = generator
+ self._result = result
+ self._loop = loop
+ result.add_done_callback(self._cancel_callback)
+ loop.call_soon(self._next)
+
+ def _cancel_callback(self, result):
+ if result.cancelled():
+ self._generator.close()
+
+ def _next(self, previous=None):
+ if self._result.cancelled():
+ return
+ try:
+ if previous is None:
+ future = next(self._generator)
+ elif previous.cancelled():
+ self._generator.throw(asyncio.CancelledError())
+ future = next(self._generator)
+ elif previous.exception() is None:
+ future = self._generator.send(previous.result())
+ else:
+ self._generator.throw(previous.exception())
+ future = next(self._generator)
+
+ except _CoroutineReturnValue as e:
+ if not self._result.cancelled():
+ self._result.set_result(e.result)
+ except StopIteration:
+ if not self._result.cancelled():
+ self._result.set_result(None)
+ except Exception as e:
+ if not self._result.cancelled():
+ self._result.set_exception(e)
+ else:
+ future = asyncio.ensure_future(future, loop=self._loop)
+ future.add_done_callback(self._next)
--
2.13.6
Loading...