# -*- coding: utf8 -*-
"""
*********************
``projectmanager.py``
*********************
`BSD`_ © 2018-2019 Science and Technology Facilities Council & contributors
.. _BSD: _static/LICENSE
``projectmanager.py`` is a module provided to setup, validate and execute source-code retrieval from
externally hosted repositories. Project specific values are extracted from the corresponding ``settings.yml``
YAML file and the projects structure is determined from the ``config.yml`` YAML file, each passed to the
module via :mod:`arguments` (see :ref:`arguments`).
"""
import helpers.customlogging as log
import helpers.version as my_version
import environmentsetup as projectenvironment
import fpgavendor_iface as fpgavendor_iface
import helpers.funcs as funcs
import importlib
import sys
import os
import re
import posixpath
import urlparse
import urllib
import getpass
import yaml
logger = log.config_logger(name=__name__)
LOGLEVEL = log.LogLevelsConsts
rev = filter(str.isdigit, "$Rev$") # Do Not Modify this Line
version = my_version.Version(1, 3, 0, svn_rev=rev, disable_svn_logging=True)
__version__ = version.get_version()
__str__ = my_version.about(__name__, __version__, version.revision)
[docs]class RepoFlow(projectenvironment.ProjectEnvironment):
"""
Parses the project settings file for repository_config key and processes locations found under the key.
This provides enough information to construct the local working_copy path using ``repo_root`` inherited
from :obj:`projectenvironment.ProjectEnvironment` and the URL of the remote repository.
Inherits: projectenvironment.ProjectEnvironment (obj): Inherited RepoFlow object.
See :obj:`projectenvironment.projectenvironment`.
Username is assumed to be the same for each repository top-level URL (using ``self.svn_username``,
``self.bitbucket_username`` and/or ``self.git_username`` inherited from
:mod:`projectenvironment.ProjectEnvironment`)
"""
def __init__(self, **kwargs):
super(RepoFlow, self).__init__()
setattr(self, 'repo_root', self.repo_root.replace('//', '/'))
self.logger.info('REPO_ROOT: {self.repo_root}'.format(self=self))
self.logger.info('SVN_USERNAME: {self.svn_username}'.format(self=self))
self.logger.info('BITBUCKET_USERNAME: {self.bitbucket_username}'.format(self=self))
self.logger.info('GIT_USERNAME: {self.git_username}'.format(self=self))
log.sect_break(self.logger)
repository_config = self.settings['repository_config']
self.repo_types = list()
if 'subversion' in repository_config:
self.logger.info('Settings Contains Subversion Repository')
self.repo_types.append('subversion')
if 'git' in repository_config:
self.logger.info('Settings Contains Git Repository')
self.repo_types.append('git')
else:
self.logger.critical('Unknown Repository Type in Settings File: {self.settings_file}'
.format(self=self))
self.dependencies = list()
for repo_type in self.repo_types:
repository_config = self.settings['repository_config'][repo_type]
self.dependencies.extend(self.process_repository_config(repository_config=repository_config,))
xml2vhdl_support = False
for depend in self.dependencies:
if depend.name == 'xml2vhdl':
xml2vhdl_support = True
setattr(self, 'xml2vhdl_support', xml2vhdl_support)
log.sect_break(self.logger)
self.logger.info('Processed Project Dependencies')
log.sect_break(self.logger)
def __repr__(self):
return "{self.__class__.__name__}()".format(self=self)
def __str__(self):
return "{} is a {self.__class__.__name__} object".format(__name__, self=self)
[docs] def process_repository_config(self, **kwargs):
"""Process repository configuration for current project
Keyword Args:
**repository_config (dict): Dictionary of Repository Configuration for Repository Type extracted
from ``settings.yml`` YAML file.
Returns:
(list of obj): A list of processed :class:`ProjectDependency` objects.
"""
repository_config = funcs.get_kwarg('repository_config', kwargs, dict())
dependencies = list()
if isinstance(repository_config, (dict,)):
for key, value in repository_config.items():
self.logger.info('Processing Dependency: {name}'
.format(name=key))
if isinstance(value, (dict,)):
if key == self.name:
top_level = True
else:
top_level = False
if value['enabled']:
try:
board_name = self.board
except AttributeError:
board_name = None
try:
board_settings = self.board_settings
except AttributeError:
board_settings = None
try:
family = self.family
except AttributeError:
family = None
try:
device = self.device
except AttributeError:
device = None
try:
vunit_default = value['vunit_default']
except KeyError:
vunit_default = False
try:
required_vhdl_libs = value['required_vhdl_libs']
except KeyError:
required_vhdl_libs = list()
self.logger.debug('**value: {value}'.format(value=value.items()))
remote_path, username, repo_type = self.construct_remote_path(**value)
if repo_type == 'git':
parsed_url = urlparse.urlsplit(value['url'])
git_path = os.path.split(parsed_url.path)[0]
urlunsplit_param = ''
urlunsplit_query = ''
urlunsplit_frag = ''
url = urlparse.urlunparse((parsed_url.scheme,
parsed_url.netloc,
git_path,
urlunsplit_param,
urlunsplit_query,
urlunsplit_frag))
else:
url = value['url']
try:
remote_rev = value['rev']
except KeyError:
remote_rev = ''
local_path = self.construct_local_path(root=self.repo_root,
path=value['path'])
category = self.get_category_from_path(supported_categories=self.categories,
category_mapping=self.category_mapping,
path=value['path'])
value.update(name=key)
if category == 'library':
cat_mapping = self.category_mapping
value.update(contrib_lib=self.get_contrib_lib_from_path(name=key,
category_mapping=cat_mapping,
path=value['path']))
else:
value.update(contrib_lib=None)
required_fields_dict = dict()
for attr in self.config['repository']['required']:
if attr in value:
required_fields_dict.update({attr: value[attr]})
self.logger.debug('\tUsing Attribute from Dependency: {attr}'
.format(attr=attr))
else:
required_fields_dict.update({attr: getattr(self, attr)})
self.logger.debug('\tUsing Inherited Attribute: {attr}'
.format(attr=attr))
dependencies.append(ProjectDependency(repo_root=self.repo_root,
name=key,
category=category,
url=url,
local_path=local_path,
remote_path=remote_path,
remote_rev=remote_rev,
required_vhdl_libs=required_vhdl_libs,
vunit_default=vunit_default,
top_level=top_level,
board_name=board_name,
board_settings=board_settings,
family=family,
device=device,
lib_append_categories=self.lib_append_categories,
repo_config_dict=self.config['repository'],
tag_replacement_dict=required_fields_dict))
if not self.checkout_disabled:
self.logger.debug('Checking Out: {name}'
.format(name=key))
try:
if url in _url_cache:
_url_cache = self._url_cache_handler(url=url,
cache=_url_cache,
username=username,)
except NameError:
self.logger.info('Generating URL Cache: {url}'
.format(url=url))
_url_cache = self._url_cache_handler(url=url,
cache={url: False},
username=username,)
if 'branch' in value:
branch = value['branch']
else:
branch = 'master'
self._checkout(cache_dict=_url_cache,
repo_type=repo_type,
url=url,
remote_path=remote_path,
remote_rev=remote_rev,
local_path=local_path,)
else:
self.logger.info('Repository Checkout Disabled')
if os.path.exists(local_path):
self.logger.info('Local Path Found on File-System: {path}'
.format(path=local_path))
else:
self.logger.critical('Local Path Missing on File-System: {path}'
.format(path=local_path))
self.logger.critical('\tDesign Might Not Compile...')
self.logger.critical('\tConfigure Project to Checkout Location')
else:
self.logger.warning('Processing for "{depend}" Disabled in Settings File: {file_ref}'
.format(depend=key,
file_ref=self.settings_file))
log.sect_break(self.logger)
return dependencies
def _url_cache_handler(self, **kwargs):
"""Caches Passwords for Accessed Controlled URLs.
To stop the script from requesting passwords for each checkout, if the repository root URL is the
same, this will cache the password so it only needs to be entered at the console once.
Args:
**url (str, optional): Default value: ``None``
**url_cache_dict (dict, optional): Default value: ``None``
**username (str, optional): Default value: ``''``
Returns:
None
"""
url = funcs.get_kwarg('url', kwargs, None)
url_cache_dict = funcs.get_kwarg('cache', kwargs, None)
username = funcs.get_kwarg('username', kwargs, '')
# Setup Password Prompt for Use by getpass:
parsed_url = urlparse.urlsplit(url)
if username != "":
username_netloc = username + '@' + parsed_url.netloc
else:
username_netloc = parsed_url.netloc
urlunsplit_param = ''
urlunsplit_query = ''
urlunsplit_frag = ''
prompt_url = urlparse.urlunparse((parsed_url.scheme,
username_netloc,
parsed_url.path,
urlunsplit_param,
urlunsplit_query,
urlunsplit_frag))
getpass_prompt = 'Password for {url}:'.format(url=prompt_url)
if url in url_cache_dict:
self.logger.debug('URL Already a key in url_cache_dict: {url}'
.format(url=url))
else:
self.logger.debug('Adding URL as key in url_cache_dict: {url}'
.format(url=url))
url_cache_dict[url] = False
if not url_cache_dict[url]:
self.logger.info('No Password Cached for URL: {url}'
.format(url=url))
if not self.dev_flag:
url_cache_dict[url] = getpass.getpass(prompt=getpass_prompt)
else:
self.logger.critical('Running Script in Dev Mode: Password Caching Disabled')
else:
self.logger.info('Password Already Cached for URL: {url}'
.format(url=url))
return url_cache_dict
[docs] def construct_remote_path(self, **kwargs):
"""Constructs the remote path in the externally hosted repository.
Determines if the URL is ``git`` and/or ``bitbucket`` from ``urlparse.netloc``
Uses ``urlparse`` to construct URL, where ``username`` is constructed manually at the front of
``urlparse.netloc``
Uses ``posixpath`` to construct path portion of URL by joining ``urlparse.path``, ``path``
and ``subpath``.
Args:
**url (str): The top-level root URL. See :ref:`settings_subversion`. Default value: ``None``
**path (str): The path. See :ref:`settings_subversion`. Default value: ``None``
**subpath (str): The sub-path. See :ref:`settings_subversion`. Default value: ``None``
Returns:
tuple containing:
* (str): Full path of the remote URL.
* (str): Username for remote URL.
* (str): Repo type <``'subversion'`` or ``'git'``>. Determined from URL.
"""
url = funcs.get_kwarg('url', kwargs, None)
path = funcs.get_kwarg('path', kwargs, None)
subpath = funcs.get_kwarg('subpath', kwargs, None)
parsed_url = urlparse.urlsplit(url)
if '.git' in parsed_url.path:
self.logger.info('Processing a git Repository URL: {url}'.format(url=url))
repo_type = 'git'
if 'bitbucket' in parsed_url.netloc:
username = self.bitbucket_username
else:
username = self.git_username
remote_path = self._url_handler(url=url,
username=username,
passwd=None,
preprocessed_path=None)
else:
self.logger.info('Processing a Subversion Repository URL: {url}'.format(url=url))
repo_type = 'subversion'
username = self.svn_username
remote_path = self._url_handler(url=url,
username=username,
passwd=None,
preprocessed_path=posixpath.join(parsed_url.path, path, subpath))
self.logger.info('Constructed Full Remote Repository URL: {remote_path}'
.format(remote_path=remote_path))
return remote_path, username, repo_type
def _url_handler(self, url, username, passwd=None, preprocessed_path=None):
"""
Args:
url (str): Fully formatted URL.
username (str): Username to into URL.
passwd (str, optional): Password to insert into URL.
preprocessed_path (str, optional): A pre-processed path to use instead of the path extracted from
``url``.
Returns:
str: Formed URL
"""
urlunsplit_param = ''
urlunsplit_query = ''
urlunsplit_frag = ''
parsed_url = urlparse.urlsplit(url)
if '@' in parsed_url.netloc:
self.logger.debug('Username Found in URL: {netloc}'.format(netloc=parsed_url.netloc))
netloc = parsed_url.netloc.split('@')
username = netloc[0]
netloc = netloc[1]
self.logger.debug('\tUsing Username from URL: {username}'.format(username=username))
else:
self.logger.debug('No Username Found in URL...')
netloc = parsed_url.netloc
if preprocessed_path and preprocessed_path != "":
url_path = preprocessed_path
else:
url_path = parsed_url.path
if username and username != "":
if passwd:
constructed_netloc = '{username}:{passwd}@{netloc}'.format(username=username,
passwd=urllib.quote_plus(passwd),
netloc=netloc)
else:
constructed_netloc = '{username}@{netloc}'.format(username=username,
netloc=netloc)
else:
constructed_netloc = parsed_url.netloc
constructed_url = urlparse.urlunparse((parsed_url.scheme,
constructed_netloc,
url_path,
urlunsplit_param,
urlunsplit_query,
urlunsplit_frag))
return constructed_url
[docs] def construct_local_path(self, **kwargs):
"""Constructs the local path for the local working-copy.
Constructed by joining ``root`` and ``path``.
Args:
**root (str): The top-level root. Usually derived from the ``$REPO_ROOT`` environment variable.
Default value: ``None``
**path (str): The path, derived from :ref:`settings_subversion`, but with the ``subpath:``
removed. This allows the switching between ``trunk``, ``branches`` and ``tags`` to be
managed by the repository client. Default value: ``None``
Returns:
(str): Full path of the local working-copy path.
"""
root = funcs.get_kwarg('root', kwargs, None)
path = funcs.get_kwarg('path', kwargs, None)
local_path = os.path.join(root, path)
self.logger.info('Constructed Full Working Copy Path: {local_path}'
.format(local_path=local_path))
return local_path
def _checkout(self, cache_dict, repo_type='subversion', **kwargs):
"""Validates checkout parameters and checks out the source-code from remote location
Args:
cache_dict (dict): Default value: ``None``
repo_type (str, optional): Operating on either a ``'subversion'`` or ``'git'`` repository.
Default value: ``'subversion'``.
Keyword Args:
**url (str): Default value: ``None``.
**remote_path (str): Default value: ``None``.
**remote_rev (str): Default value: ``None``.
**local_path (str): Default value: ``None``.
**branch (str): git branch to clone Default value: ``master``.
Returns:
(bool): ``False`` if trying to operate on a ``git`` repository.
"""
url = funcs.get_kwarg('url', kwargs, None)
remote_path = funcs.get_kwarg('remote_path', kwargs, None)
remote_rev = funcs.get_kwarg('remote_rev', kwargs, None)
local_path = funcs.get_kwarg('local_path', kwargs, None)
branch = funcs.get_kwarg('branch', kwargs, 'master')
wc_exists = False
if os.path.exists(local_path):
self.logger.warning('Local Path Already Exists for Checkout Operation: {path}'
.format(path=local_path))
local_repo_obj = self._set_working_copy(local_path=local_path,
repo_type=repo_type)
if local_repo_obj and self._check_working_copy(remote_type=repo_type,
local_repo_obj=local_repo_obj,
remote_path=remote_path):
self.logger.warning('Working Copy Location is Already Checked Out. '
'May Require Updating: {path}'
.format(path=local_path))
wc_exists = True
else:
return False
if not wc_exists:
remote_repo_obj = self._set_remote(cache_dict, repo_type, **kwargs)
if remote_repo_obj:
if repo_type == 'subversion':
self.logger.info('Performing Subversion Repository Check Out:')
self.logger.info('\t{remote_path}@{remote_rev} -> {local_path}'
.format(remote_path=remote_path,
remote_rev=remote_rev,
local_path=local_path))
remote_repo_obj.checkout(local_path, remote_rev)
return True
elif repo_type == 'git':
import git as git
for l in ['git']:
log.disable_logger(log, l)
self.logger.info('Performing Git Repository Check Out:')
self.logger.info('\t{remote_path} -> {local_path}'
.format(remote_path=remote_path, local_path=local_path))
git_url = self._url_handler(url=remote_path,
username=None,
passwd=cache_dict[url],
preprocessed_path=None)
try:
repo = git.Repo.clone_from(url=git_url, to_path=local_path, b=branch)
return True
except git.exc.GitCommandError as e:
self.logger.error('Could Not Clone Repository...')
for l in str(e).splitlines():
if urllib.quote_plus(cache_dict[url]) not in l:
self.logger.error(l)
log.errorexit(self.logger)
else:
self.logger.critical('Problem Performing {repo_type} Checkout:'
.format(repo_type=repo_type.capitalize()))
self.logger.critical('\t{remote_path}@{remote_rev} -> {local_path}'
.format(remote_path=remote_path,
remote_rev=remote_rev,
local_path=local_path))
self.logger.critical('Skipping...')
return False
def _set_working_copy(self, repo_type='subversion', **kwargs):
"""Connects to an existing local working_copy.
Args:
repo_type (str, optional): Operating on either a ``'subversion'`` or ``'git'`` repository.
Default value: ``'subversion'``.
Keyword Args:
**local_path (str): Default value: ``None``.
Returns:
(bool): ``False`` if trying to operate on a ``git`` repository.
"""
local_path = funcs.get_kwarg('local_path', kwargs, None)
if repo_type == 'subversion':
import svn.local
repo_obj = svn.local.LocalClient(local_path)
elif repo_type == 'git':
repo_obj = False
return repo_obj
def _check_working_copy(self, repo_type='subversion', **kwargs):
"""Checks existing local working_copy against expected working-working parameters.
If a working-copy already exists in the local file-system, check to see if it matches the expected
remote working-copy.
Args:
repo_type (str, optional): Operating on either a ``'subversion'`` or ``'git'`` repository.
Default value: ``'subversion'``.
Keyword Args:
**local_path (str): Default value: ``None``.
Returns:
(bool): ``False`` if trying to operate on a ``git`` repository.
"""
repo_obj = funcs.get_kwarg('local_repo_obj', kwargs, None)
remote_path = funcs.get_kwarg('remote_path', kwargs, None)
if repo_type == 'subversion':
# Fetch the Associated URL from the Working Copy
try:
repo_info = repo_obj.info()
wc_url = repo_info['url']
except Exception as e:
self.logger.critical('Failed to Connect with Working Copy')
self.logger.critical('\t{msg}'
.format(msg=e))
self.logger.critical('\tSkipping...')
return False
elif repo_type == 'git':
# \TODO Fetch the Associated URL from the Working Copy when using git and return True
return False
if wc_url == remote_path:
self.logger.info('URL Match: Valid Working Copy:')
self.logger.info('\t{url}'
.format(url=remote_path))
return True
else:
self.logger.critical('URL Mismatched: Invalid Working Copy:')
self.logger.critical('\tExpected: {url}'
.format(url=remote_path))
self.logger.critical('\tGot: {url}'
.format(url=wc_url))
return False
def _set_remote(self, cache_dict, repo_type='subversion', **kwargs):
""" Sets the remote repository for performing Checkout/Clone operations.
Args:
cache_dict (dict): Default value: ``None``
repo_type (str, optional): Operating on either a ``'subversion'`` or ``'git'`` repository.
Default value: ``'subversion'``.
Keyword Args:
**url (str): Default value: ``None``.
**remote_path (str): Default value: ``None``.
Returns:
(bool): ``False`` if trying to operate on a ``git`` repository.
"""
url = funcs.get_kwarg('url', kwargs, None)
remote_path = funcs.get_kwarg('remote_path', kwargs, None)
if repo_type == 'subversion':
import svn.remote
for l in ['svn']:
log.disable_logger(log, l)
self.logger.info('Connecting to Subversion Repository...')
remote_repo_obj = svn.remote.RemoteClient(remote_path,
username=self.svn_username,
password=cache_dict[url])
try:
repo_info = remote_repo_obj.info()
self.logger.info('Successfully Connected to Remote Location: {url}'
.format(url=remote_path))
return remote_repo_obj
except Exception as e:
self.logger.critical('Failed to Connect to Remote Location: {url}'
.format(url=remote_path))
self.logger.critical('Skipping...')
return False
elif repo_type == 'git':
return True
[docs] def get_category_from_path(self, **kwargs):
"""Gets ``category:`` from ``path``.
The first directory in the path string represents the category.
Keyword Args:
**path (str): Default value: ``''``
**category_mapping (dict, optional): Category mapping (see :ref:`config_categories`).
Default value: ``dict()``.
**supported_categories (list of str, optional): List of supported categories from ``config.yml``
configuration file (see :ref:`config_categories`). Default value: ``list()``.
Returns:
(str): the ``category`` extracted from ``path``.
"""
path = funcs.get_kwarg('path', kwargs, '')
category_mapping = funcs.get_kwarg('category_mapping', kwargs, dict())
supported_categories = funcs.get_kwarg('supported_categories', kwargs, list())
cat = path.split('/')[0]
for k, v in category_mapping.iteritems():
if v == cat:
cat = k
if cat in supported_categories:
self.logger.info('Category: {cat}'
.format(cat=cat))
return cat
else:
self.logger.error('Could Not Resolve Category from Path')
self.logger.error('\tPath: {path}'
.format(path=path))
self.logger.error('\tCategory: {cat}'
.format(cat=cat))
self.logger.error('\tSupported Categories: {cats}'
.format(cats=supported_categories))
log.errorexit(self.logger)
[docs] def get_contrib_lib_from_path(self, **kwargs):
"""Gets ``contrib_lib:`` from ``path``.
Assumes (i know!) that ``contrib_lib`` exists between the ``category`` and ``name``.
For a valid ``contrib_lib`` to exist the number of elements in ``path`` should be = 3.
Keyword Args:
**name (str): The project name being processed. Default value: ``''``
**path (str): The path of the project on the file-system. Default value: ``''``
**category_mapping (dict, optional): Category mapping (see :ref:`config_categories`).
Default value: ``dict()``.
Returns:
(str): the ``contrib_lib`` extracted from ``path``.
"""
name = funcs.get_kwarg('name', kwargs, '')
path = funcs.get_kwarg('path', kwargs, '')
category_mapping = funcs.get_kwarg('category_mapping', kwargs, dict())
path_elements = path.split('/')
self.logger.debug('Nof Path Elements: {len}'
.format(len=len(path_elements)))
cat = path_elements[0]
if len(path_elements) == 3:
for k, v in category_mapping.iteritems():
if v == cat:
cat = k
contrib_lib = path_elements[1]
path_name = path_elements[2]
self.logger.debug('Category : {cat}'
.format(cat=cat))
self.logger.debug('Contrib : {contrib_lib}'
.format(contrib_lib=contrib_lib))
self.logger.debug('Path Name: {path_name}'
.format(path_name=path_name))
if path_elements[2] == name:
contrib_lib = path_elements[1]
self.logger.info('Contributor: {contrib_lib}'
.format(contrib_lib=contrib_lib))
return contrib_lib
# If it gets this far raise a warning because no valid contrib_lib was found
self.logger.warning('No Contributor for: {name}'
.format(name=name))
return None
[docs]class ProjectDependency(object):
"""Processes project dependencies.
Takes a dictionary and converts keys to attributes before parsing configuration to construct all
attributes required to describe a Dependency Object
Args:
repo_root (str): The resolved ``$REPO_ROOT`` system environment variable. Required by:
:mod:`projectmanager.ProjectDependency.layout_to_dir` and
:mod:`projectmanager.ProjectDependency.prune_path_list`
name (str): Name of dependency
category (str): The dependency's category
url (str): Root URL to remote repository location for the dependency
local_path (str): Local path of the *working copy* for the dependency
remote_path (str): Remote path, from the ``url`` for the dependency
remote_rev (str): Specific repository revision for the dependency
required_vhdl_libs (list of str, optional): List of required HDL libraries required by the dependency.
Default value: ``list()``
vunit_default (bool, optional): VUnit dependency flag. Default value: ``False``
top_level (bool, optional): Project Top-level dependency flag. Default value: ``False``
board (str, optional): Target board for dependency. Default value: ``None``
board_settings (str, optional): Settings file name for target board for dependency.
Default value: ``None``
family (str, optional): Target FPGA Family for HDL dependency. Default value: ``None``
device (str, optional): Target FPGA Device for HDL dependency. Default value: ``None``
Keyword Args:
**repo_config_dict (dict): The layout dictionary which defines the projects layout within the
repository and working copy on the local file-system.
Required by :mod:`projectmanager.ProjectDependency.layout_to_dir`
Default value: ``dict()``
**lib_append_categories (list of str): Default value: ``list()``
**bitbucket_username (str, optional): Default value: ``None``
**tag_replacement_dict (dict): Dictionary of required tag replacement fields required by
:mod:`projectmanager.ProjectDependency.layout_tag_attr_replace`. This is variable based on
``category``
Default value: ``dict()``
**retag_layout (bool, optional). Retags the layout list (for 'vendor_ip'. Default value: ``True``
"""
def __init__(self, repo_root, name, category, url, local_path, remote_path, remote_rev,
required_vhdl_libs=list(), vunit_default=False, top_level=False,
board_name=None, board_settings=None, family=None, device=None, **kwargs):
self.logger = log.config_logger(name=__name__, class_name=self.__class__.__name__)
setattr(self, 'category', category)
setattr(self, 'top_level', top_level)
setattr(self, 'required_vhdl_libs', required_vhdl_libs)
setattr(self, 'vunit_default', vunit_default)
setattr(self, 'url', url)
setattr(self, 'local_path', local_path)
setattr(self, 'remote_path', remote_path)
setattr(self, 'remote_rev', remote_rev)
setattr(self, 'family', family)
setattr(self, 'device', device)
lib_append_categories = funcs.get_kwarg('lib_append_categories', kwargs, list())
retag_layout = funcs.get_kwarg('retag_layout', kwargs, True)
if vunit_default:
lib = self.get_lib_from_name(name=name,
category=category,
lib_append_categories=lib_append_categories)
else:
lib = None
setattr(self, 'lib', lib)
# setattr(self, 'bitbucket_username', funcs.get_kwarg('bitbucket_username', kwargs, None))
repo_config_dict = funcs.get_kwarg('repo_config_dict', kwargs, dict())
self.logger.debug('Tag Replacements:')
for k, v in funcs.get_kwarg('tag_replacement_dict', kwargs, dict()).items():
self.logger.debug('\t{k}:{v}'.format(k=k, v=v))
setattr(self, k, v)
layout_str = self.category + '_layout'
self.logger.debug('Using Layout: {layout}'
.format(layout=layout_str))
setattr(self, '_temp_layout_list', list())
_layout_dict = repo_config_dict[layout_str]
self._top_dirs = list()
self.layout_list = list()
top_refs = self.set_top_refs(category, name, self.tool_version)
if category == 'boards':
setattr(self, 'is_board', True)
if board_name:
self.logger.info('Dependency is a Board: Using Board Name as Name: {board_name}'
.format(board_name=board_name))
setattr(self, 'name', board_name)
else:
self.logger.critical('Dependency is a Board: But No Board Name Supplied, Using: {name}'
.format(name=name))
setattr(self, 'name', name)
if board_settings:
if board_settings.endswith('.yml'):
setattr(self, 'board_settings', board_settings)
else:
setattr(self, 'board_settings', board_settings + '.yml')
else:
default_settings_file = 'default_settings.yml'
self.logger.warning('No Board Settings File Name Supplied. Using: {default_settings_file}'
.format(default_settings_file=default_settings_file))
setattr(self, 'board_settings', default_settings_file)
else:
setattr(self, 'name', name)
setattr(self, 'is_board', False)
setattr(self, 'board_settings', None)
self.layout_to_dir(layout_dict=_layout_dict,
repo_root=repo_root,
top_refs=top_refs,
replace_tags=True)
for kw in self._top_dirs:
attr_name = self.top_dir_lookup(dir_name=kw)
path_list = self.prune_path_list(path_list=self.layout_list,
root_path=repo_root,
keyword=kw)
if path_list:
if len(path_list) == 1:
self.logger.debug('Created Attribute: {attr} {value}'
.format(attr=attr_name, value=path_list[0]))
setattr(self, attr_name, path_list[0])
else:
self.logger.warning('More than One Path Found when Generating {attr}_dir: {path_list}'
.format(attr=kw, path_list=path_list))
self.logger.warning('Created Attribute from First Value: {attr} {value}'
.format(attr=attr_name, value=path_list[0]))
setattr(self, attr_name, path_list[0])
else:
self.logger.debug('Not Creating _dir Attribute...')
if category == 'vendor_ip' and retag_layout:
setattr(self, 'layout_dict', _layout_dict)
retagged_layout_list = self.retag_path(term='vendor_ip',
tag='<NAME>',
paths=self.layout_list,
repo_root=repo_root)
retagged_layout_list = self.retag_path(term=self.vendor,
tag='<VENDOR>',
paths=retagged_layout_list,
repo_root=repo_root)
if self.tool_version:
retagged_layout_list = self.retag_path(term=self.tool_version,
tag='<TOOL_VERSION>',
paths=retagged_layout_list,
repo_root=repo_root)
if family:
retagged_layout_list = self.retag_path(term=family,
tag='<FAMILY>',
paths=retagged_layout_list,
repo_root=repo_root)
if device:
retagged_layout_list = self.retag_path(term=device,
tag='<DEVICE>',
paths=retagged_layout_list,
repo_root=repo_root)
setattr(self, 'layout_list', retagged_layout_list)
for p in self.layout_list:
self.logger.debug('Rebuilt Vendor IP Layout: {p}'.format(p=p))
elif category == 'vendor_ip':
pass
else:
pass
[docs] def retag_path(self, **kwargs):
"""Re-tags path(s).
Keyword Args:
**term (str): Term to retag.
**tag (str): Tag value to replace term with.
**paths (str or list of str): absolute path(s) to retag. This assumes the absolute path is in the
form: ``$REPO_ROOT/<TERM>/xx/xx/xx/<TERM>`` where the second term will be replaced with
``tag``.
**repo_root (str): The actual path of the system environment variable ``$REPO_ROOT``
Returns:
list of str: List of retagged absolute paths. Empty ``list()`` if no repo_root supplied.
"""
term = funcs.get_kwarg('term', kwargs, None)
tag = funcs.get_kwarg('tag', kwargs, '<NAME>')
paths = funcs.get_kwarg('paths', kwargs, list())
repo_root = funcs.get_kwarg('repo_root', kwargs, None)
if isinstance(paths, (str, )) and paths != "":
paths = [paths]
if repo_root:
term_root = os.path.join(repo_root, term)
else:
return list()
rebuild_layout_list = list()
for p in paths:
try:
split_path = p.split(term_root)[1][1:]
except IndexError:
term_root = repo_root
split_path = p.split(term_root)[1][1:]
renamed_path = split_path.replace(term, tag)
rebuilt_path = os.path.join(term_root, renamed_path)
rebuild_layout_list.append(rebuilt_path)
self.logger.debug('Rebuilt Layout: {p}'.format(p=rebuilt_path))
return rebuild_layout_list
[docs] def get_lib_from_name(self, **kwargs):
"""Determines the ``VHDL`` library name from project name.
Determines the ``VHDL`` library name from the project name, ignoring if suffix already exists.
the suffix is only appended if the ``category`` exists in the ``lib_append_categories`` list
Keyword Args:
**name (str): The project name being processed. Default value: ``''``
**category (str): The ``category`` of the project being processed. Default value: ``''``
**lib_append_categories (list of str, optional): List of categories to append ``lib_suffix``
from ``config.yml`` configuration file (see :ref:`config_categories`). If the ``category``
is **not** in this list the suffix will **not** be appended. Default value: ``list()``.
**lib_suffix (str, optional): The library suffix to append. Default value: ``'_lib'``
Returns:
(str): The name of the ``VHDL`` library
"""
name = funcs.get_kwarg('name', kwargs, '')
category = funcs.get_kwarg('category', kwargs, '')
lib_append_categories = funcs.get_kwarg('lib_append_categories', kwargs, list())
lib_suffix = funcs.get_kwarg('lib_suffix', kwargs, '_lib')
suffix_len = len(lib_suffix)
suffix_idx = -suffix_len
self.logger.debug('Determining Library from Name: {name}'
.format(name=name))
self.logger.debug('Library Suffix: {suffix} [{idx}]'
.format(suffix=lib_suffix, idx=suffix_idx))
if category in lib_append_categories:
if len(name) > suffix_len:
if name[suffix_idx:] != lib_suffix:
name = name + lib_suffix
else:
name = name + lib_suffix
self.logger.info('Library Name: {name}'
.format(name=name))
return name
[docs] @staticmethod
def top_dir_lookup(**kwargs):
"""Maps supported top-level directory names to known values for specified usage throughout
Keyword Args:
**dir_name (str): Name of directory name to attempt to map.
Returns:
str: Mapped name if matched. ``dir_name`` if *not* matched.
"""
dir_name = funcs.get_kwarg('dir_name', kwargs, '')
if dir_name in ['constraints', 'constr', 'constrs']:
name = 'constraints'
elif dir_name in ['docs', 'doc', 'documents', 'documentation']:
name = 'docs'
elif dir_name in ['netlists', 'net', 'nets']:
name = 'netlists'
elif dir_name in ['scripts', 'script']:
name = 'scripts'
elif dir_name in ['settings', 'setting']:
name = 'settings'
elif dir_name in ['simulation', 'sim', 'sims']:
name = 'simulation'
elif dir_name in ['src', 'srcs', 'sources']:
name = 'src'
else:
name = dir_name
return '{attr}_dir'.format(attr=name)
[docs] @staticmethod
def set_top_refs(category, name, tool_version):
"""
Args:
category (str): category to determine the top-level reference to return.
name (str): returned if ``category`` is *not* ``boards`` or ``vendor_ip``
tool_version (str): returned if ``category`` *is* ``boards`` or ``vendor_ip``
Returns:
list of str
"""
if category == 'boards':
return [tool_version]
elif category == 'vendor_ip':
return [tool_version]
else:
return [name]
[docs] def layout_to_dir(self, **kwargs):
"""Takes a single top-level and creates the directory and file structure as a list.
This layout structure is expected to be defined in the layout_dict keyword passed to the module.
Uses top_refs list to look for a reference to indicate the top reference string, which is
used to populate self._top_dirs, this can then be used to create attributes for top_level reference
directories.
Keyword Args:
**layout_dict (dict): Default value: ``dict()``
**repo_root (str): Default value: ``None``
**top_refs (list of str): Default value: ``list()``
**replace_tags (bool): Default value: ``True``
Returns:
None
"""
layout_dict = funcs.get_kwarg('layout_dict', kwargs, dict())
repo_root = funcs.get_kwarg('repo_root', kwargs, None)
top_refs = funcs.get_kwarg('top_refs', kwargs, list())
replace_tags = funcs.get_kwarg('replace_tags', kwargs, True)
self.logger.debug('layout_to_dir()replace_tags: {replace_tags}'
.format(replace_tags=replace_tags))
self.logger.debug('layout_to_dir():layout_dict: {layout_dict}'
.format(layout_dict=layout_dict))
self.logger.debug('layout_to_dir():repo_root: {repo_root}'
.format(repo_root=repo_root))
if isinstance(layout_dict, (dict,)):
for key, value in layout_dict.items():
if replace_tags:
key = self.layout_tag_attr_replace(tag=key)
if key in top_refs:
if isinstance(value, (list, dict,)):
for i in value:
self._top_dirs.append(i.keys()[0])
self.logger.debug('layout_to_dir():key:{}'.format(key).lower())
self.logger.debug('1:layout_to_dir({}, {})'.format(value, os.path.join(repo_root, key)))
if value:
self.layout_to_dir(layout_dict=value,
repo_root=os.path.join(repo_root, key),
top_refs=top_refs,
replace_tags=replace_tags)
else:
self.logger.debug(os.path.join(repo_root, key))
self.layout_list.append(os.path.join(repo_root, key))
elif isinstance(layout_dict, (list,)):
for item in layout_dict:
if isinstance(item, (dict,)):
self.logger.debug('2:layout_to_dir({}, {})'.format(item, repo_root))
self.layout_to_dir(layout_dict=item,
repo_root=repo_root,
top_refs=top_refs,
replace_tags=replace_tags)
else:
if replace_tags:
item = self.layout_tag_attr_replace(tag=item)
self.logger.debug('os.repo_root.join({}, {})'.format(repo_root, item))
self.logger.debug(os.path.join(repo_root, item))
self.layout_list.append(os.path.join(repo_root, item))
[docs] def layout_tag_attr_replace(self, **kwargs):
"""Replaces ``<TAG>`` with corresponding attribute
Looks for ``<TAG>`` (enclosed in ``<>``) in tag string passed to the function, and if found
returns a string with the tag replaced by the value in the ``self.<TAG>`` attribute.
If the attribute exists but is not populated with a value an "empty" string ``''`` is returned.
This allows all <TAG> options in the layout configuration to be optional.
Although ``self.name`` should always have a value as it is also used to determine filenames
in the directory structure.
Keyword Args:
**tag (str): string to check for sub-strings enclosed in ``<>``.
Returns:
(str): An empty string if corresponding attribute for the ``<TAG>`` is **not** found; or the
corresponding attribute if found; or the original ``<TAG>`` if anything else.
"""
tag = funcs.get_kwarg('tag', kwargs, None)
if '<' in tag and '>' in tag:
attribute_tag = funcs.strip_tag(tag)
self.logger.debug('Processing {tag}: {attribute_tag}'
.format(tag=tag, attribute_tag=attribute_tag))
else:
self.logger.debug('Not a Valid Tag: {tag}'
.format(tag=tag))
return tag
if hasattr(self, attribute_tag):
if not getattr(self, attribute_tag):
self.logger.critical('Attribute Not Found: {}'
.format(attribute_tag))
return ''
else:
self.logger.debug('Valid Tag {tag}: {attribute_tag}'
.format(tag=tag, attribute_tag=attribute_tag))
resolved_attr = getattr(self, attribute_tag)
self.logger.debug('Resolved Attribute from Tag: {}'
.format(resolved_attr))
return re.sub('(<[^>]+>)', resolved_attr, tag)
else:
self.logger.debug('Attribute Not Found: {}'
.format(attribute_tag))
return tag
[docs] def prune_path(self, **kwargs):
"""Prunes path down towards ``root_path``.
Takes a path and prunes that path until prune_to is found, if root_path is reached it means that
prune_to was not found in the path.
Returns path up to and including prune_to, or False if not found.
Keyword Args:
**path (str): Path to prune. Default value: ``None``
**prune_to (str): Prune down to this path. Default value: ``None``
**root_path (str): If ``root_path`` is reached the prune fails. Default value: ``None``
Returns:
(str): The found pruned path. The `bool:` ``False`` is returned if ``root_path`` is reached
without finding the prune criteria.
"""
path = funcs.get_kwarg('path', kwargs, None)
prune_to = funcs.get_kwarg('prune_to', kwargs, None)
root_path = funcs.get_kwarg('root_path', kwargs, None)
while True:
if os.path.split(path)[1] == prune_to:
self.logger.debug('Pruned Path: {}'
.format(path))
return path
elif os.path.split(path)[0] == root_path:
self.logger.critical('{prune} Not found in: {path}'
.format(prune=prune_to, path=path))
return False
else:
path = os.path.split(path)[0]
[docs] def prune_path_list(self, **kwargs):
"""Prunes list of paths based on ``keyword`` matching.
Keyword Args:
**path_list (list of str): List of paths to prune. Default value: ``None``
**keyword (str): Keyword which **must** exist in ``path_list`` item for prune.
Default value: ``''``
**root_path (str): If ``root_path`` is reached the prune fails. Default value: ``None``
Returns:
(list of str): A list of pruned paths.
"""
path_list = funcs.get_kwarg('path_list', kwargs, None)
keyword = funcs.get_kwarg('keyword', kwargs, '')
root_path = funcs.get_kwarg('root_path', kwargs, None)
keyword_matches = [kw for kw in path_list if keyword in kw]
pruned_paths = []
for path in keyword_matches:
pruned_path = self.prune_path(path=path, prune_to=keyword, root_path=root_path)
if pruned_path not in pruned_paths:
self.logger.debug('Adding Path to Pruned Paths: {}'
.format(pruned_path))
pruned_paths.append(pruned_path)
return pruned_paths
def set_as_board(self, **kwargs):
top_level_dependency = funcs.get_kwarg('top_level_dependency', kwargs, None)
board_priority_list = funcs.get_kwarg('board_priority_list', kwargs,
['vendor', 'tool_version', 'family', 'device'])
board_settings_file = False
self.logger.info('.' * 80)
self.logger.info('Additional Board Processing...')
if self.settings_dir:
self.logger.debug('Board Settings Path: {settings_dir}'
.format(settings_dir=self.settings_dir))
for root, subpath, files in os.walk(self.settings_dir):
for f in files:
try:
if self.board_settings in f:
board_settings_file = os.path.join(root, f)
self.logger.info('Found Settings File for Board: {f}'
.format(f=board_settings_file))
break
except AttributeError:
if 'default_settings' in os.path.splitext(f):
board_settings_file = os.path.join(root, f)
self.logger.info('Found Default Settings File for Board: {f}'
.format(f=board_settings_file))
break
if board_settings_file:
with open(board_settings_file, 'r') as ymlfile:
setattr(self, 'board_settings', yaml.load(ymlfile))
self.logger.info('Loaded Board Settings from YAML File: {f}'
.format(f=board_settings_file))
try:
board_tool_version = fpgavendor_iface.resolve_tool_version(tool_version=self.board_settings['tool_version'],
vendor=self.board_settings['vendor'])
self.board_settings['tool_version'] = board_tool_version
except KeyError:
pass
self.logger.info('Using Settings from Board...')
for key in board_priority_list:
if key in self.board_settings:
if self.board_settings[key] == getattr(top_level_dependency, key, False):
self.logger.debug('[{key}] Project Setting: {proj} Matches Board Setting: {board}.'
.format(key=key,
proj=getattr(top_level_dependency, key, False),
board=self.board_settings[key]))
setattr(self, key, self.board_settings[key])
else:
self.logger.warning('[{key}] Project Setting: {proj} Does Not Matches Board Setting: '
'{board}. Using Board Setting.'
.format(key=key,
proj=getattr(top_level_dependency, key, False),
board=self.board_settings[key]))
setattr(self, key, self.board_settings[key])
for key in self.board_settings:
if not hasattr(self, key):
self.logger.info('[{key}] Adding Setting from: {board_settings_file}'
.format(key=key, board_settings_file=board_settings_file))
setattr(self, key, self.board_settings[key])
if hasattr(self, 'ip'):
setattr(self, 'has_vendor_ip', True)
else:
setattr(self, 'has_vendor_ip', False)
if not hasattr(self, 'board_id'):
setattr(self, 'board_id', 'X"0000_0000"')
if not hasattr(self, 'board_timestamp_enabled'):
setattr(self, 'board_timestamp_enabled', False)
[docs]class VendorIpDependency(object):
"""Vendor IP is an Vendor IP object containing methods and attributes required for processing ``vendor_ip``
Keyword Args:
**repo_root (str): Absolute path to the system environment ``$REPO_ROOT``.
**master_vendor_ip (obj): A project dependency object for the *master* ``vendor_ip`` dependency for
the project.
**vendor (str): FPGA Vendor. Used to determine ``vendor_ip`` locations.
Valid values: ``'xilinx'`` and ``altera'``. Default value: ``'xilinx'``
**tool_version (str): FPGA Tool Version. Used to determine ``vendor_ip`` locations.
**family (str): FPGA Family. Used to determine ``vendor_ip`` locations.
**device (str): FPGA Device. Used to determine ``vendor_ip`` locations.
**ip_dict (dict):
**settings_file (str): Absolute path to settings YAML file, used in logs.
"""
def __init__(self, **kwargs):
self.logger = log.config_logger(name=__name__, class_name=self.__class__.__name__)
setattr(self, 'repo_root', funcs.get_kwarg('repo_root', kwargs, None))
setattr(self, 'vendor', funcs.get_kwarg('vendor', kwargs, 'xilinx'))
setattr(self, 'tool_version', funcs.get_kwarg('tool_version', kwargs, None))
setattr(self, 'family', funcs.get_kwarg('family', kwargs, None))
setattr(self, 'device', funcs.get_kwarg('device', kwargs, None))
ip_dict = funcs.get_kwarg('ip_dict', kwargs, dict())
setattr(self, 'settings_file', funcs.get_kwarg('settings_file', kwargs, ''))
master_vendor_ip = funcs.get_kwarg('master_vendor_ip', kwargs, None)
master_layout_list = list()
if master_vendor_ip:
master_layout_list = self.resolved_master_tags(paths=master_vendor_ip.layout_list,
vendor=self.vendor,
tool_version=self.tool_version,
family=self.family,
device=self.device)
for p in master_layout_list:
self.logger.info('(Unresolved) Master Vendor IP Path: {p}'.format(p=p))
resolved_ip = dict()
for cat, subcats in ip_dict.items():
if isinstance(subcats, (dict, )):
for subcat, v in subcats.items():
if isinstance(v, (dict, )):
try:
if v['enabled']:
resolved_ip[subcat] = {'ipcat': cat,
'ipsubcat': '',
'lib': v['library'],
'wrapper_file': v['wrapper_file']}
self.logger.debug('No Subcategory!: {cat}|{name}'
.format(cat=cat,
name=subcat))
except KeyError:
for n, s in v.items():
try:
if s['enabled']:
resolved_ip[n] = {'ipcat': cat,
'ipsubcat': subcat,
'lib': s['library'],
'wrapper_file': s['wrapper_file']}
self.logger.debug('Subcategory!: {cat}|{subcat}|{name}'
.format(cat=cat,
subcat=subcat,
name=n))
except KeyError:
self.logger.critical('Vendor IP Category/Sub-Category too Deep: '
'{cat}|{subcat}|{name}. Skipping'
.format(cat=cat,
subcat=subcat,
name=n))
for k, v in resolved_ip.items():
paths = list()
for p in master_layout_list:
resolved_path = p.replace('<NAME>', k)
resolved_path = resolved_path.replace('<IPCAT>', v['ipcat'])
resolved_path = resolved_path.replace('<IPSUBCAT>', v['ipsubcat'])
paths.append(os.path.abspath(resolved_path))
for p in master_layout_list:
if 'common_wrapper_files' in p:
try:
wrapper_name = v['wrapper_file'].replace('_wrapper', '')
resolved_path = p.replace('<NAME>', wrapper_name)
resolved_path = resolved_path.replace('<IPCAT>', v['ipcat'])
resolved_path = resolved_path.replace('<IPSUBCAT>', v['ipsubcat'])
if os.path.abspath(resolved_path) not in paths:
paths.append(os.path.abspath(resolved_path))
except AttributeError:
pass
self.logger.debug('All Found Paths for {ip}:'
.format(ip=k))
for path in paths:
self.logger.info('(Resolved) IP Search Path: {path}'
.format(path=path))
for path in paths:
for root, subpaths, filenames in os.walk(path):
self.logger.debug('Subpaths: {subpaths}'.format(subpaths=subpaths))
top_level_paths = [os.path.join(root, sp) for sp in subpaths]
for top in top_level_paths:
self.logger.debug('\tTop-Level Path: {top}'
.format(top=top))
top_dir_str = ProjectDependency.top_dir_lookup(dir_name=os.path.split(top)[1])
if top_dir_str in resolved_ip[k]:
resolved_ip[k][top_dir_str].append(top)
else:
resolved_ip[k][top_dir_str] = [top]
break
setattr(self, 'enabled_ip', resolved_ip)
if __name__ == '__main__':
logger.info("projectmanager {} {}"
.format(__version__, version.revision))
log.sect_break(logger)
project = RepoFlow()
else:
logger.info(__str__)