Discussion:
[gentoo-portage-dev] [PATCH 0/5] EbuildFetcher._get_uri_map(): fix event loop recursion (bug 653810)
Zac Medico
2018-04-22 22:30:09 UTC
Permalink
Bug: https://bugs.gentoo.org/653810

Zac Medico (5):
portdbapi: add async_fetch_map method (bug 653810)
EbuildFetcher: inherit CompositeTask (bug 653810)
EbuildFetcher: add _async_uri_map method (bug 653810)
EbuildFetcher: use _async_uri_map in _start (bug 653810)
EbuildFetcher: add async_already_fetched method (bug 653810)

pym/_emerge/EbuildBuild.py | 8 +++-
pym/_emerge/EbuildFetcher.py | 105 ++++++++++++++++++++++++++++++++----------
pym/portage/dbapi/porttree.py | 75 +++++++++++++++++++++++-------
3 files changed, 146 insertions(+), 42 deletions(-)
--
2.13.6
Zac Medico
2018-04-22 22:30:10 UTC
Permalink
Add a portdbapi.async_fetch_map method which is identical to the
existing portdbapi.getFetchMap method, but returns a future. This
will be used by EbuildFetcher in order to avoid event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
pym/portage/dbapi/porttree.py | 75 +++++++++++++++++++++++++++++++++----------
1 file changed, 58 insertions(+), 17 deletions(-)

diff --git a/pym/portage/dbapi/porttree.py b/pym/portage/dbapi/porttree.py
index 951e5760a..3cd929963 100644
--- a/pym/portage/dbapi/porttree.py
+++ b/pym/portage/dbapi/porttree.py
@@ -728,25 +728,66 @@ class portdbapi(dbapi):
URIs.
@rtype: dict
"""
+ loop = self._event_loop
+ return loop.run_until_complete(
+ self.async_fetch_map(mypkg, useflags=useflags,
+ mytree=mytree, loop=loop))

- try:
- eapi, myuris = self.aux_get(mypkg,
- ["EAPI", "SRC_URI"], mytree=mytree)
- except KeyError:
- # Convert this to an InvalidDependString exception since callers
- # already handle it.
- raise portage.exception.InvalidDependString(
- "getFetchMap(): aux_get() error reading "+mypkg+"; aborting.")
+ def async_fetch_map(self, mypkg, useflags=None, mytree=None, loop=None):
+ """
+ Asynchronous form of getFetchMap.

- if not eapi_is_supported(eapi):
- # Convert this to an InvalidDependString exception
- # since callers already handle it.
- raise portage.exception.InvalidDependString(
- "getFetchMap(): '%s' has unsupported EAPI: '%s'" % \
- (mypkg, eapi))
-
- return _parse_uri_map(mypkg, {'EAPI':eapi,'SRC_URI':myuris},
- use=useflags)
+ @param mypkg: cpv for an ebuild
+ @type mypkg: String
+ @param useflags: a collection of enabled USE flags, for evaluation of
+ conditionals
+ @type useflags: set, or None to enable all conditionals
+ @param mytree: The canonical path of the tree in which the ebuild
+ is located, or None for automatic lookup
+ @type mypkg: String
+ @param loop: event loop (defaults to global event loop)
+ @type loop: EventLoop
+ @return: A future that results in a dict which maps each file name to
+ a set of alternative URIs.
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = loop or global_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ result = loop.create_future()
+
+ def aux_get_done(aux_get_future):
+ if result.cancelled():
+ return
+ if aux_get_future.exception() is not None:
+ if isinstance(future.exception(), PortageKeyError):
+ # Convert this to an InvalidDependString exception since
+ # callers already handle it.
+ result.set_exception(portage.exception.InvalidDependString(
+ "getFetchMap(): aux_get() error reading "
+ + mypkg + "; aborting."))
+ else:
+ result.set_exception(future.exception())
+ return
+
+ eapi, myuris = aux_get_future.result()
+
+ if not eapi_is_supported(eapi):
+ # Convert this to an InvalidDependString exception
+ # since callers already handle it.
+ result.set_exception(portage.exception.InvalidDependString(
+ "getFetchMap(): '%s' has unsupported EAPI: '%s'" % \
+ (mypkg, eapi)))
+ return
+
+ result.set_result(_parse_uri_map(mypkg,
+ {'EAPI':eapi,'SRC_URI':myuris}, use=useflags))
+
+ aux_get_future = self.async_aux_get(
+ mypkg, ["EAPI", "SRC_URI"], mytree=mytree)
+ result.add_done_callback(lambda result:
+ aux_get_future.cancel() if result.cancelled() else None)
+ aux_get_future.add_done_callback(aux_get_done)
+ return result

def getfetchsizes(self, mypkg, useflags=None, debug=0, myrepo=None):
# returns a filename:size dictionnary of remaining downloads
--
2.13.6
Zac Medico
2018-04-22 22:30:11 UTC
Permalink
Make EbuildFetcher inherit CompositeTask, and move remaining code
to a _EbuildFetcherProcess class that still inherits ForkProcess.
The CompositeTask framework will be used to split EbuildFetcher
into subtasks, in order to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
pym/_emerge/EbuildFetcher.py | 22 +++++++++++++++++++---
1 file changed, 19 insertions(+), 3 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index d98d00736..81eeb6dcd 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -15,12 +15,17 @@ from portage.elog.messages import eerror
from portage.package.ebuild.fetch import _check_distfile, fetch
from portage.util._async.ForkProcess import ForkProcess
from portage.util._pty import _create_pty_or_pipe
+from _emerge.CompositeTask import CompositeTask

-class EbuildFetcher(ForkProcess):
+
+class EbuildFetcher(CompositeTask):

__slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
- "pkg", "prefetch") + \
- ("_digests", "_manifest", "_settings", "_uri_map")
+ "logfile", "pkg", "prefetch", "_fetcher_proc")
+
+ def __init__(self, **kwargs):
+ CompositeTask.__init__(self, **kwargs)
+ self._fetcher_proc = _EbuildFetcherProcess(**kwargs)

def already_fetched(self, settings):
"""
@@ -32,7 +37,18 @@ class EbuildFetcher(ForkProcess):
such messages. This will raise InvalidDependString if SRC_URI is
invalid.
"""
+ return self._fetcher_proc.already_fetched(settings)
+
+ def _start(self):
+ self._start_task(self._fetcher_proc, self._default_final_exit)
+

+class _EbuildFetcherProcess(ForkProcess):
+
+ __slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
+ "pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map")
+
+ def already_fetched(self, settings):
uri_map = self._get_uri_map()
if not uri_map:
return True
--
2.13.6
Zac Medico
2018-04-22 22:30:12 UTC
Permalink
Add an _async_uri_map method to replace the synchronous _get_uri_map
method. This will be used to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
pym/_emerge/EbuildFetcher.py | 27 ++++++++++++++++++++-------
1 file changed, 20 insertions(+), 7 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 81eeb6dcd..1f574740b 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -49,7 +49,7 @@ class _EbuildFetcherProcess(ForkProcess):
"pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map")

def already_fetched(self, settings):
- uri_map = self._get_uri_map()
+ uri_map = self.scheduler.run_until_complete(self._async_uri_map())
if not uri_map:
return True

@@ -125,7 +125,7 @@ class _EbuildFetcherProcess(ForkProcess):
ebuild_path = self._get_ebuild_path()

try:
- uri_map = self._get_uri_map()
+ uri_map = self.scheduler.run_until_complete(self._async_uri_map())
except portage.exception.InvalidDependString as e:
msg_lines = []
msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \
@@ -210,21 +210,34 @@ class _EbuildFetcherProcess(ForkProcess):
self._digests = self._get_manifest().getTypeDigests("DIST")
return self._digests

- def _get_uri_map(self):
+ def _async_uri_map(self):
"""
- This can raise InvalidDependString from portdbapi.getFetchMap().
+ This calls the portdbapi.async_fetch_map method and returns the
+ resulting Future (may contain InvalidDependString exception).
"""
if self._uri_map is not None:
- return self._uri_map
+ result = self.scheduler.create_future()
+ result.set_result(self._uri_map)
+ return result
+
pkgdir = os.path.dirname(self._get_ebuild_path())
mytree = os.path.dirname(os.path.dirname(pkgdir))
use = None
if not self.fetchall:
use = self.pkg.use.enabled
portdb = self.pkg.root_config.trees["porttree"].dbapi
- self._uri_map = portdb.getFetchMap(self.pkg.cpv,
+
+ def cache_result(result):
+ try:
+ self._uri_map = result.result()
+ except Exception:
+ # The caller handles this when it retrieves the result.
+ pass
+
+ result = portdb.async_fetch_map(self.pkg.cpv,
useflags=use, mytree=mytree)
- return self._uri_map
+ result.add_done_callback(cache_result)
+ return result

def _prefetch_size_ok(self, uri_map, settings, ebuild_path):
distdir = settings["DISTDIR"]
--
2.13.6
Zac Medico
2018-04-22 22:30:13 UTC
Permalink
Use _async_uri_map to avoid event loop recursion in _start.

Bug: https://bugs.gentoo.org/653810
---
pym/_emerge/EbuildFetcher.py | 34 ++++++++++++++++++++++------------
1 file changed, 22 insertions(+), 12 deletions(-)

diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 1f574740b..8f6cc60fe 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -13,6 +13,7 @@ from portage import _unicode_decode
from portage.checksum import _hash_filter
from portage.elog.messages import eerror
from portage.package.ebuild.fetch import _check_distfile, fetch
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.ForkProcess import ForkProcess
from portage.util._pty import _create_pty_or_pipe
from _emerge.CompositeTask import CompositeTask
@@ -40,6 +41,25 @@ class EbuildFetcher(CompositeTask):
return self._fetcher_proc.already_fetched(settings)

def _start(self):
+ self._start_task(
+ AsyncTaskFuture(future=self._fetcher_proc._async_uri_map()),
+ self._start_fetch)
+
+ def _start_fetch(self, uri_map_task):
+ self._assert_current(uri_map_task)
+ try:
+ uri_map = uri_map_task.future.result()
+ except portage.exception.InvalidDependString as e:
+ msg_lines = []
+ msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \
+ (self.pkg.cpv, e)
+ msg_lines.append(msg)
+ self._fetcher_proc._eerror(msg_lines)
+ self._current_task = None
+ self.returncode = 1
+ self._async_wait()
+ return
+
self._start_task(self._fetcher_proc, self._default_final_exit)


@@ -123,18 +143,8 @@ class _EbuildFetcherProcess(ForkProcess):
root_config = self.pkg.root_config
portdb = root_config.trees["porttree"].dbapi
ebuild_path = self._get_ebuild_path()
-
- try:
- uri_map = self.scheduler.run_until_complete(self._async_uri_map())
- except portage.exception.InvalidDependString as e:
- msg_lines = []
- msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \
- (self.pkg.cpv, e)
- msg_lines.append(msg)
- self._eerror(msg_lines)
- self._set_returncode((self.pid, 1 << 8))
- self._async_wait()
- return
+ # This is initialized by an earlier _async_uri_map call.
+ uri_map = self._uri_map

if not uri_map:
# Nothing to fetch.
--
2.13.6
Zac Medico
2018-04-22 22:30:14 UTC
Permalink
Add an async_already_fetched method to replace the synchronous
already_fetched method, and use it to prevent event loop recursion.

Bug: https://bugs.gentoo.org/653810
---
pym/_emerge/EbuildBuild.py | 8 +++++++-
pym/_emerge/EbuildFetcher.py | 30 ++++++++++++++++++++++++------
2 files changed, 31 insertions(+), 7 deletions(-)

diff --git a/pym/_emerge/EbuildBuild.py b/pym/_emerge/EbuildBuild.py
index 9d4afd0ea..21c7f81ce 100644
--- a/pym/_emerge/EbuildBuild.py
+++ b/pym/_emerge/EbuildBuild.py
@@ -207,8 +207,14 @@ class EbuildBuild(CompositeTask):
logfile=self.settings.get('PORTAGE_LOG_FILE'),
pkg=self.pkg, scheduler=self.scheduler)

+ self._start_task(AsyncTaskFuture(
+ future=fetcher.async_already_fetched(self.settings)),
+ functools.partial(self._start_fetch, fetcher))
+
+ def _start_fetch(self, fetcher, already_fetched_task):
+ self._assert_current(already_fetched_task)
try:
- already_fetched = fetcher.already_fetched(self.settings)
+ already_fetched = already_fetched_task.future.result()
except portage.exception.InvalidDependString as e:
msg_lines = []
msg = "Fetch failed for '%s' due to invalid SRC_URI: %s" % \
diff --git a/pym/_emerge/EbuildFetcher.py b/pym/_emerge/EbuildFetcher.py
index 8f6cc60fe..589eda85d 100644
--- a/pym/_emerge/EbuildFetcher.py
+++ b/pym/_emerge/EbuildFetcher.py
@@ -28,7 +28,7 @@ class EbuildFetcher(CompositeTask):
CompositeTask.__init__(self, **kwargs)
self._fetcher_proc = _EbuildFetcherProcess(**kwargs)

- def already_fetched(self, settings):
+ def async_already_fetched(self, settings):
"""
Returns True if all files already exist locally and have correct
digests, otherwise return False. When returning True, appropriate
@@ -38,7 +38,7 @@ class EbuildFetcher(CompositeTask):
such messages. This will raise InvalidDependString if SRC_URI is
invalid.
"""
- return self._fetcher_proc.already_fetched(settings)
+ return self._fetcher_proc.async_already_fetched(settings)

def _start(self):
self._start_task(
@@ -68,11 +68,29 @@ class _EbuildFetcherProcess(ForkProcess):
__slots__ = ("config_pool", "ebuild_path", "fetchonly", "fetchall",
"pkg", "prefetch", "_digests", "_manifest", "_settings", "_uri_map")

- def already_fetched(self, settings):
- uri_map = self.scheduler.run_until_complete(self._async_uri_map())
- if not uri_map:
- return True
+ def async_already_fetched(self, settings):
+ result = self.scheduler.create_future()
+
+ def uri_map_done(uri_map_future):
+ if uri_map_future.exception() is not None or result.cancelled():
+ if not result.cancelled():
+ result.set_exception(uri_map_future.exception())
+ return
+
+ uri_map = uri_map_future.result()
+ if uri_map:
+ result.set_result(
+ self._check_already_fetched(settings, uri_map))
+ else:
+ result.set_result(True)
+
+ uri_map_future = self._async_uri_map()
+ result.add_done_callback(lambda result:
+ aux_get_future.cancel() if result.cancelled() else None)
+ uri_map_future.add_done_callback(uri_map_done)
+ return result

+ def _check_already_fetched(self, settings, uri_map):
digests = self._get_digests()
distdir = settings["DISTDIR"]
allow_missing = self._get_manifest().allow_missing
--
2.13.6
Brian Dolbec
2018-04-23 16:10:22 UTC
Permalink
On Sun, 22 Apr 2018 15:30:09 -0700
Post by Zac Medico
Bug: https://bugs.gentoo.org/653810
portdbapi: add async_fetch_map method (bug 653810)
EbuildFetcher: inherit CompositeTask (bug 653810)
EbuildFetcher: add _async_uri_map method (bug 653810)
EbuildFetcher: use _async_uri_map in _start (bug 653810)
EbuildFetcher: add async_already_fetched method (bug 653810)
pym/_emerge/EbuildBuild.py | 8 +++-
pym/_emerge/EbuildFetcher.py | 105
++++++++++++++++++++++++++++++++----------
pym/portage/dbapi/porttree.py | 75 +++++++++++++++++++++++------- 3
files changed, 146 insertions(+), 42 deletions(-)
I didn't see any errors glaring at me... :)

looks like it should be good
--
Brian Dolbec <dolsen>
Zac Medico
2018-04-23 18:58:32 UTC
Permalink
Post by Brian Dolbec
On Sun, 22 Apr 2018 15:30:09 -0700
Post by Zac Medico
Bug: https://bugs.gentoo.org/653810
portdbapi: add async_fetch_map method (bug 653810)
EbuildFetcher: inherit CompositeTask (bug 653810)
EbuildFetcher: add _async_uri_map method (bug 653810)
EbuildFetcher: use _async_uri_map in _start (bug 653810)
EbuildFetcher: add async_already_fetched method (bug 653810)
pym/_emerge/EbuildBuild.py | 8 +++-
pym/_emerge/EbuildFetcher.py | 105
++++++++++++++++++++++++++++++++----------
pym/portage/dbapi/porttree.py | 75 +++++++++++++++++++++++------- 3
files changed, 146 insertions(+), 42 deletions(-)
I didn't see any errors glaring at me... :)
looks like it should be good
Thanks, merged:

https://gitweb.gentoo.org/proj/portage.git/commit/?id=0a5692bd4f4ab4a54daf2ce09112617cbc21c9ad
--
Thanks,
Zac
Loading...