Discussion:
[gentoo-portage-dev] [PATCH 0/2] ManifestScheduler: async fetchlist_dict (bug 653946)
Zac Medico
2018-04-24 23:45:57 UTC
Permalink
In order to avoid event loop recursion, pass fetchlist_dict to
ManifestTask as a Future.

Zac Medico (2):
Add iter_gather function (bug 653946)
ManifestScheduler: async fetchlist_dict (bug 653946)

.../ebuild/_parallel_manifest/ManifestScheduler.py | 82 ++++++++++++++++++----
.../ebuild/_parallel_manifest/ManifestTask.py | 22 ++++++
pym/portage/tests/dbapi/test_portdb_cache.py | 1 +
pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++
4 files changed, 159 insertions(+), 14 deletions(-)
--
2.13.6
Zac Medico
2018-04-24 23:45:58 UTC
Permalink
This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters. For bug 653946, this will be used to asynchronously
gather the results of the portdbapi.async_fetch_map calls that
are required to generate a Manifest, while using the max_jobs
parameter to limit the number of concurrent async_aux_get calls.

Bug: https://bugs.gentoo.org/653946
---
pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++
1 file changed, 68 insertions(+)

diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py
index 5ad075305..4e52a499f 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
# cleanup in case of interruption by SIGINT, etc
scheduler.cancel()
scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+ """
+ This is similar to asyncio.gather, but takes an iterator of
+ futures as input, and includes support for max_jobs and max_load
+ parameters.
+
+ @param futures: iterator of asyncio.Future (or compatible)
+ @type futures: iterator
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @param loop: event loop
+ @type loop: EventLoop
+ @return: a Future resulting in a list of done input futures, in the
+ same order that they were yielded from the input iterator
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = loop or global_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ result = loop.create_future()
+ futures_list = []
+
+ def future_generator():
+ for future in futures:
+ futures_list.append(future)
+ yield future
+
+ completed_iter = async_iter_completed(
+ future_generator(),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ loop=loop,
+ )
+
+ def handle_result(future_done_set):
+ if result.cancelled():
+ return
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ try:
+ handle_result.current_task = next(completed_iter)
+ except StopIteration:
+ handle_result.current_task = None
+ result.set_result(futures_list)
+ else:
+ handle_result.current_task.add_done_callback(handle_result)
+
+ def cancel_callback(result):
+ if (result.cancelled() and
+ handle_result.current_task is not None and
+ not handle_result.current_task.done()):
+ handle_result.current_task.cancel()
+
+ result.add_done_callback(cancel_callback)
+
+ return result
--
2.13.6
Zac Medico
2018-04-24 23:45:59 UTC
Permalink
In order to avoid event loop recursion, pass fetchlist_dict to
ManifestTask as a Future.

Bug: https://bugs.gentoo.org/653946
---
.../ebuild/_parallel_manifest/ManifestScheduler.py | 82 ++++++++++++++++++----
.../ebuild/_parallel_manifest/ManifestTask.py | 22 ++++++
pym/portage/tests/dbapi/test_portdb_cache.py | 1 +
3 files changed, 91 insertions(+), 14 deletions(-)

diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
index 38ac4825e..07794522e 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py
@@ -4,9 +4,9 @@
import portage
from portage import os
from portage.dep import _repo_separator
-from portage.exception import InvalidDependString
from portage.localization import _
from portage.util._async.AsyncScheduler import AsyncScheduler
+from portage.util.futures.iter_completed import iter_gather
from .ManifestTask import ManifestTask

class ManifestScheduler(AsyncScheduler):
@@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler):
cpv_list = portdb.cp_list(cp, mytree=[repo_config.location])
if not cpv_list:
continue
- fetchlist_dict = {}
- try:
- for cpv in cpv_list:
- fetchlist_dict[cpv] = \
- list(portdb.getFetchMap(cpv, mytree=mytree))
- except InvalidDependString as e:
- portage.writemsg(
- _("!!! %s%s%s: SRC_URI: %s\n") %
- (cp, _repo_separator, repo_config.name, e),
- noiselevel=-1)
- self._error_count += 1
- continue

+ # Use _future_fetchlist(max_jobs=1), since we
+ # spawn concurrent ManifestTask instances.
yield ManifestTask(cp=cp, distdir=distdir,
- fetchlist_dict=fetchlist_dict, repo_config=repo_config,
+ fetchlist_dict=_future_fetchlist(
+ self._event_loop, portdb, repo_config, cp, cpv_list,
+ max_jobs=1),
+ repo_config=repo_config,
gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars,
force_sign_key=self._force_sign_key)

@@ -91,3 +84,64 @@ class ManifestScheduler(AsyncScheduler):
noiselevel=-1)

AsyncScheduler._task_exit(self, task)
+
+
+def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list,
+ max_jobs=None, max_load=None):
+ """
+ Asynchronous form of FetchlistDict, with max_jobs and max_load
+ parameters in order to control async_aux_get concurrency.
+
+ @param loop: event loop
+ @type loop: EventLoop
+ @param portdb: portdbapi instance
+ @type portdb: portdbapi
+ @param repo_config: repository configuration for a Manifest
+ @type repo_config: RepoConfig
+ @param cp: cp for a Manifest
+ @type cp: str
+ @param cpv_list: list of ebuild cpv values for a Manifest
+ @type cpv_list: list
+ @param max_jobs: max number of futures to process concurrently (default
+ is multiprocessing.cpu_count())
+ @type max_jobs: int
+ @param max_load: max load allowed when scheduling a new future,
+ otherwise schedule no more than 1 future at a time (default
+ is multiprocessing.cpu_count())
+ @type max_load: int or float
+ @return: a Future resulting in a Mapping compatible with FetchlistDict
+ @rtype: asyncio.Future (or compatible)
+ """
+ loop = getattr(loop, '_asyncio_wrapper', loop)
+ result = loop.create_future()
+
+ def gather_done(gather_result):
+ if result.cancelled():
+ return
+
+ e = None
+ for future in gather_result.result():
+ if (future.done() and future.exception() is not None):
+ # Retrieve exceptions from all futures in order to
+ # avoid triggering the event loop's error handler.
+ e = future.exception()
+
+ if e is None:
+ result.set_result(dict((k, list(v.result()))
+ for k, v in zip(cpv_list, gather_result.result())))
+ else:
+ result.set_exception(e)
+
+ gather_result = iter_gather(
+ (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop)
+ for cpv in cpv_list),
+ max_jobs=max_jobs,
+ max_load=max_load,
+ loop=loop,
+ )
+
+ gather_result.add_done_callback(gather_done)
+ result.add_done_callback(lambda result:
+ gather_result.cancel if result.cancelled() else gather_result)
+
+ return result
diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
index 0ee2b910d..6f5fe5b16 100644
--- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
+++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py
@@ -8,8 +8,12 @@ import subprocess
from portage import os
from portage import _unicode_encode, _encodings
from portage.const import MANIFEST2_IDENTIFIERS
+from portage.dep import _repo_separator
+from portage.exception import InvalidDependString
+from portage.localization import _
from portage.util import (atomic_ofstream, grablines,
shlex_split, varexpand, writemsg)
+from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from _emerge.CompositeTask import CompositeTask
@@ -29,6 +33,24 @@ class ManifestTask(CompositeTask):
def _start(self):
self._manifest_path = os.path.join(self.repo_config.location,
self.cp, "Manifest")
+
+ self._start_task(
+ AsyncTaskFuture(future=self.fetchlist_dict),
+ self._start_with_fetchlist)
+
+ def _start_with_fetchlist(self, fetchlist_task):
+ if self._default_exit(fetchlist_task) != os.EX_OK:
+ if not self.fetchlist_dict.cancelled():
+ try:
+ self.fetchlist_dict.result()
+ except InvalidDependString as e:
+ writemsg(
+ _("!!! %s%s%s: SRC_URI: %s\n") %
+ (self.cp, _repo_separator, self.repo_config.name, e),
+ noiselevel=-1)
+ self._async_wait()
+ return
+ self.fetchlist_dict = self.fetchlist_dict.result()
manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir,
fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config,
scheduler=self.scheduler)
diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py
index bd934460a..1f139b256 100644
--- a/pym/portage/tests/dbapi/test_portdb_cache.py
+++ b/pym/portage/tests/dbapi/test_portdb_cache.py
@@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase):
portage_python = portage._python_interpreter
egencache_cmd = (portage_python, "-b", "-Wd",
os.path.join(self.bindir, "egencache"),
+ "--update-manifests", "--sign-manifests=n",
"--repo", "test_repo",
"--repositories-configuration", settings.repositories.config_string())
python_cmd = (portage_python, "-b", "-Wd", "-c")
--
2.13.6
Loading...