...
 
Commits (4)
# -*- coding: utf-8 -*-
import collections
import functools
import itertools
import logging
from . import check
from . import core
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
# optimization code, tree construction
def getPermutations(matrix):
"""return an iterator of all the crossings to perform"""
size = len(matrix)
for i in range(size):
for j in range(i):
# yield as many times as the strands cross
yield from itertools.repeat((j, i), abs(matrix[i][j]))
def _pairwise(iterable):
# see https://docs.python.org/3/library/itertools.html#itertools-recipes
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def getNeighbours(position):
"""return an iterator of the neighbour pairs"""
for left, right in _pairwise(position):
if left < right:
yield left, right
else:
yield right, left
def detectDoubles(permutationlist):
"""check whether permutationList contains multiple crossings for the same strand"""
already_permuted = set()
for perm in permutationlist:
for strand in perm:
if strand in already_permuted:
return True # strand is permuted more than once: double
else:
already_permuted.add(strand)
return False
def all_subsets(position):
combinator = functools.partial(
itertools.combinations,
list(getNeighbours(position))
)
subsets = itertools.chain.from_iterable(
map(
combinator,
reversed(range(1, len(position) // 2 + 1)) # give priority to subsets with many permutations
)
)
return subsets
def updatePermutationList(permList, toRemove):
out = list(permList) # do not alter original, permList is a list of tuple
for permutation in toRemove:
out.remove(permutation)
return out
def updatePosition(position, toPermute):
out = list(position) # do not alter original, position is a list of int
for perm in toPermute:
ind1 = position.index(perm[0])
ind2 = position.index(perm[1])
out[ind1], out[ind2] = out[ind2], out[ind1]
return out
class Node:
def __init__(self, permutationList, currentPosition, parent=None, depth=0, transition=None):
self.currentPosition = currentPosition
self.permutationList = permutationList
self.incomingTransition = transition
self.parent = parent
self.children = []
self.depth = depth
def addChild(self, child):
self.children.append(child)
def getPossibleTransitions(self):
transitions = []
potential_transitions = itertools.filterfalse(
detectDoubles, # remove transitions permuting the same strand many times
all_subsets(self.currentPosition)
)
for subset in potential_transitions:
for perm in subset:
if perm not in self.permutationList:
# perm is not a valid permutation: subset is not a valid
# transition
break
else:
# all permutations in subset are in self.permutionList: subset
# is a valid transition
transitions.append(subset)
assert sorted(transitions, key=len, reverse=True) == transitions
return transitions
class Tree:
def __init__(self):
self.structure = []
self.leaf = None
def addNode(self, node):
self.structure.append(node)
def getAtDepth(self, depth):
if len(self.structure) == 0:
return []
atDepth = []
for node in self.structure:
if node.depth == depth:
atDepth.append(node)
return atDepth
def getShortestPaths(self):
try:
if len(self.structure) == 0:
return []
endpoint = self.leaf
path = [endpoint.incomingTransition]
node = endpoint
while node.parent != None:
node = node.parent
if node.incomingTransition != None:
path.insert(0, node.incomingTransition)
logger.info('Shortest template')
for level, step in enumerate(path, start=1):
logger.info(f' Level {level}: {", ".join(map(str, sorted(step)))}')
return path
except AttributeError:
return []
@classmethod
def from_matrix(cls, matrix):
assert check.is_linking(matrix)
tree = cls()
initialPosition = list(range(len(matrix)))
permList = list(getPermutations(matrix))
root = Node(permList, initialPosition)
logger.info(f'Maximum possible template length: {len(permList)}')
level = 0
lifo = collections.deque()
lifo.append((root, level))
finalPosition = core.final_position(matrix)
while lifo:
head = lifo.popleft()
if head[1] != level:
logger.info(f' Exploring depth {head[1]} of the permutation tree')
node, level = head
tree.addNode(node)
for transition in node.getPossibleTransitions():
child = Node(updatePermutationList(node.permutationList, transition),
updatePosition(node.currentPosition, transition), node, node.depth + 1, transition)
node.addChild(child)
lifo.append((child, level + 1))
if len(child.permutationList) == 0 and finalPosition == child.currentPosition:
tree.leaf = child
return tree
return tree
......@@ -4,7 +4,8 @@ import itertools as _itertools
import svgwrite as _svgwrite
from . import export
from . import core as _core
from . import export as _export
COLORSET = (
......@@ -298,7 +299,7 @@ class _SVGDrawer:
sprite = 'no-torsion'
elif torque > 0:
sprite = 'pos-torsion'
else: # torque < 0
else: # torque < 0
sprite = 'neg-torsion'
shape = self._use_sprite(sprite, strand, position)
......@@ -416,13 +417,14 @@ class _SVGDrawer:
self.dwg.add(shape)
class SVGExporter(export.Exporter, alias='svg'):
class SVGExporter(_export.Exporter, alias='svg'):
def __init__(self, colorset=COLORSET):
self.positions = None
self.drawer = None
self.colorset = colorset
def _palette(self, size, color=True):
# pylint: disable=no-else-return
if color:
# cycle through COLORS as long as needed
return _itertools.islice(_itertools.cycle(self.colorset), size)
......@@ -435,6 +437,7 @@ class SVGExporter(export.Exporter, alias='svg'):
@staticmethod
def _decrease_torsion_torque(torque):
# pylint: disable=no-else-return
if torque == 0:
return 0
elif torque < 0:
......@@ -469,9 +472,8 @@ class SVGExporter(export.Exporter, alias='svg'):
self.drawer.depth += CROSSING_HEIGHT
def _export_layering(self, size):
final_order = [None] * size
for strand, position in enumerate(self.positions):
final_order[position] = strand
# paint layers from left to right (according to order, not position)
final_order = _core.convert_order_position(self.positions)
for position, strand in enumerate(final_order):
self.drawer.draw_layer(strand, position, size)
self.drawer.depth += LAYERING_HEIGHT
......
......@@ -4,9 +4,9 @@
Logic to check the validity of linking matrices.
"""
import itertools
import itertools as _itertools
from . import core
from . import core as _core
def is_square(matrix):
......@@ -59,7 +59,7 @@ def _neighbors_criterion(matrix):
for i in range(2, size)
for j in range(i - 1)
)
neighbor_pairs = itertools.chain(
neighbor_pairs = _itertools.chain(
orthogonal_pairs,
diagonal_pairs,
antidiagonal_pairs
......@@ -70,7 +70,7 @@ def _neighbors_criterion(matrix):
def _final_position_criterion(matrix):
"""Check whether the final position of strands is a genuine permutation."""
size = len(matrix)
return set(core.final_position(matrix)) == set(range(size))
return set(_core.final_position(matrix)) == set(range(size))
def is_continuous(matrix):
......@@ -148,7 +148,7 @@ def _planarity_criterion(matrix):
stack.append(interval)
return not bool(stack) # all intervals must be closed
final_position_ = core.final_position(matrix)
final_position_ = _core.final_position(matrix)
even_intervals = _intervals_sequence(final_position_, parity=0)
odd_intervals = _intervals_sequence(final_position_, parity=1)
return (
......
......@@ -4,20 +4,19 @@
Logic to handle the Command Line Interface (CLI).
"""
import argparse
import json
import logging
import logging.config
import os
import os.path
import textwrap
from . import main
from . import __name__ as pkgname # name of the main package (i.e., the app)
import argparse as _argparse
import json as _json
import logging as _logging
import logging.config as _logging_config
import os as _os
import textwrap as _textwrap
from . import main as _main
from . import __name__ as _pkgname # name of the main package (i.e., the app)
from . import __version__
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger = _logging.getLogger(__name__) # pylint: disable=invalid-name
# XXX: SCALE_BOUNDS constant tuple introduces a strong coupling between this
......@@ -39,29 +38,29 @@ def _bounded_float(inf, sup):
return value
else:
msg = '{} does not fall within [{}, {}].'.format(value, inf, sup)
raise argparse.ArgumentTypeError(msg)
raise _argparse.ArgumentTypeError(msg)
return _type
def _create_parser():
parser = argparse.ArgumentParser(
prog=pkgname,
formatter_class=argparse.RawDescriptionHelpFormatter,
parser = _argparse.ArgumentParser(
prog=_pkgname,
formatter_class=_argparse.RawDescriptionHelpFormatter,
description='Draw the templates of chaotic attractors.',
epilog=textwrap.dedent(
epilog=_textwrap.dedent(
f'''
To read a matrix from a file whose name starts with a '-' for example
'-foo.json', use one of these commands:
{pkgname} -- -foo.json
{_pkgname} -- -foo.json
{pkgname} ./-foo.json
{_pkgname} ./-foo.json
'''
),
)
parser.add_argument(
'--version',
action='version',
version='{} {}'.format(pkgname, __version__),
version='{} {}'.format(_pkgname, __version__),
)
parser.add_argument(
'-s', '--scale',
......@@ -111,17 +110,17 @@ def _setup_logging():
See https://docs.python.org/3/library/logging.config.html for more details
on the configuration of the logging module.
"""
log_cfg_filename = os.getenv('CATE_LOG_CFG')
if log_cfg_filename is not None and os.path.isfile(log_cfg_filename):
log_cfg_filename = _os.getenv('CATE_LOG_CFG')
if log_cfg_filename is not None and _os.path.isfile(log_cfg_filename):
with open(log_cfg_filename, mode='rt') as log_cfg_fd:
log_cfg = json.load(log_cfg_fd)
logging.config.dictConfig(log_cfg)
log_cfg = _json.load(log_cfg_fd)
_logging_config.dictConfig(log_cfg)
else:
logging.basicConfig(
_logging.basicConfig(
# levelname is padded with spaces to the length of the longest levelname
format='[{levelname:^8}] {message}',
style='{',
level=logging.INFO,
level=_logging.INFO,
)
......@@ -132,4 +131,4 @@ def cli():
options = vars(parser.parse_args()) # read argparse.Namespace as a dict
logger.debug(f'parsed arguments: {options}')
infile = options.pop('matrix') # extract positional argument from options
main.run(infile, **options)
_main.run(infile, **options)
......@@ -4,19 +4,32 @@
Core routines and optimization logic.
"""
import json
import logging
import collections as _collections
import itertools as _itertools
import json as _json
import logging as _logging
from . import check
from . import _legacy # XXX get rid of this!
import multiset as _multiset
from . import check as _check
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger = _logging.getLogger(__name__) # pylint: disable=invalid-name
def convert_order_position(vector):
"""Convert an order vector into a position vector, and vice-versa."""
# order vectors and position vectors are dual objects, see the definition
# of final_position for a more detailed description
dual = [None] * len(vector)
for i, item in enumerate(vector):
dual[item] = i
return dual
def final_position(matrix):
"""Compute the final position of strands according to the linking matrix."""
assert check.is_symmetric(matrix)
assert _check.is_symmetric(matrix)
size = len(matrix)
# compute the final order of strands using Melvin's algorithm
# given a strand `s` and an index `i`, we have `final_order[i] == s`
......@@ -30,21 +43,97 @@ def final_position(matrix):
final_order[i] += 1
# transform final_order to get the final position
# given a strand `s` and an index `i`, we have `final_position_[s] == i`
final_position_ = [None] * size
for position, strand in enumerate(final_order):
final_position_[strand] = position
return final_position_
return convert_order_position(final_order)
def _pairwise(iterable):
# see https://docs.python.org/3/library/itertools.html#itertools-recipes
a, b = _itertools.tee(iterable) # pylint: disable=invalid-name
next(b, None)
return zip(a, b)
class _OptimizerState:
"""Internal state of the optimization logic to compute crosslevels."""
def __init__(self, position, crossings):
self.position = position # current position of strands
self.crossings = crossings # remaining crossings
@staticmethod
def _adjacent_pairs(position):
"""Generate the pairs of adjacent strands, in increasing order."""
for left, right in _pairwise(position):
if left < right:
yield left, right
else:
yield right, left
@staticmethod
def _is_valid_transition(transition):
"""Return True iff every strand is used in at most one crossing."""
already_encountered = set()
for crossing in transition:
for strand in crossing:
if strand in already_encountered:
return False # strand is used more than once
else:
already_encountered.add(strand)
return True
def _valid_transitions(self):
"""Generate the valid transitions from self."""
# keep one occurence of each remaining crossings
crossings_set = set(self.crossings.distinct_elements())
# only consider crossings of adjacent strands
crossings_set.intersection_update(self._adjacent_pairs(self.position))
# yield valid transitions with decreasing number of crossings
for size in reversed(range(1, len(crossings_set) + 1)):
yield from filter(
self._is_valid_transition,
_itertools.combinations(crossings_set, size)
)
@staticmethod
def _apply_transition(position, transition):
"""Compute the position reached by applying transition to position."""
order = convert_order_position(position)
new_position = list(position) # do not alter original position
for left, right in transition:
new_position[order[left]], new_position[order[right]] = \
new_position[order[right]], new_position[order[left]]
order[left], order[right] = order[right], order[left]
return tuple(new_position)
def next_states(self):
"""
Generate the tuples (state, transition) for reachable states from self.
"""
for transition in self._valid_transitions():
new_position = self._apply_transition(self.position, transition)
new_crossings = self.crossings.difference(transition)
new_state = type(self)(new_position, new_crossings)
yield new_state, transition
def __eq__(self, other):
return self.position == other.position and \
self.crossings == other.crossings
def __hash__(self):
return hash((self.position, self.crossings))
class Template:
def __init__(self, matrix):
if not check.is_linking(matrix):
if not _check.is_linking(matrix):
raise ValueError('Invalid linking matrix')
self.matrix = matrix
self.__crosslevels = None
@classmethod
def from_json(cls, fp):
def from_json(cls, fp): # pylint: disable=invalid-name
"""
Create a template by loading a linking matrix from a JSON file.
......@@ -62,8 +151,8 @@ class Template:
# interpret the input as a JSON file
try:
matrix = json.load(fp)
except json.JSONDecodeError as err:
matrix = _json.load(fp)
except _json.JSONDecodeError as err:
raise TypeError('Malformed JSON') from err
# check the loaded input JSON structure is compatible with a linking
......@@ -91,12 +180,55 @@ class Template:
"""Number of (oriented) torsions for each strand of the template."""
return [self.matrix[i][i] for i in range(len(self.matrix))]
@property
def crossings(self):
"""Mapping of (crossing, arity) for each crossing of the template."""
return {
(j, i): abs(self.matrix[i][j])
for i in range(len(self.matrix))
for j in range(i)
if self.matrix[i][j] # keep only non-zero crossings
}
def _optimize_crosslevels(self):
"""Compute a sequence of crossing levels of minimum depth."""
initial_state = _OptimizerState(
tuple(range(self.size)),
_multiset.FrozenMultiset(self.crossings)
)
final_state = _OptimizerState(
tuple(final_position(self.matrix)),
_multiset.FrozenMultiset()
)
# depth tracks the number of crosslevels
# path tracks the previous state
# how tracks the transition used from the previous state
depth, path, how = {initial_state: 0}, {}, {}
lifo = _collections.deque()
lifo.append(initial_state)
while lifo:
current_state = lifo.popleft()
for new_state, transition in current_state.next_states():
if new_state not in depth:
depth[new_state], path[new_state], how[new_state] = \
depth[current_state] + 1, current_state, transition
lifo.append(new_state)
crosslevels = [None] * depth[final_state]
current_state = final_state
for depth in reversed(range(depth[final_state])):
crosslevels[depth] = how[current_state]
current_state = path[current_state]
return crosslevels
@property
def crosslevels(self):
"""Level-by-level list of concurrent crossings of the template."""
if self.__crosslevels is None:
logger.info('Starting optimization of template depth')
optimizer = _legacy.Tree.from_matrix(self.matrix)
self.__crosslevels = optimizer.getShortestPaths()
self.__crosslevels = self._optimize_crosslevels()
logger.info('Finished optimization of template depth')
return self.__crosslevels
# -*- coding: utf-8 -*-
import logging
import sys
import logging as _logging
import sys as _sys
from . import core
from . import export
from . import core as _core
from . import export as _export
logger = logging.getLogger(__name__) # pylint: disable=invalid-name
logger = _logging.getLogger(__name__) # pylint: disable=invalid-name
def run(infile, *, output, color=True, complete_flow=False, scale=1.0):
try:
if infile == '-': # the special argument '-' means stdin
template = core.Template.from_json(sys.stdin)
template = _core.Template.from_json(_sys.stdin)
else:
with open(infile) as fp:
template = core.Template.from_json(fp)
with open(infile) as fp: # pylint: disable=invalid-name
template = _core.Template.from_json(fp)
except TypeError:
logger.error("Invalid JSON input")
exit(code=1)
......@@ -33,18 +33,18 @@ def run(infile, *, output, color=True, complete_flow=False, scale=1.0):
logger.info(f' {row}')
logger.info("Starting creation of the SVG template")
exporter = export.SVGExporter()
exporter = _export.SVGExporter()
try:
if output == '-': # the special argument '-' means stdout
exporter.write(
template,
output=sys.stdout,
output=_sys.stdout,
color=color,
complete_flow=complete_flow,
scale=scale
)
else:
with open(output, mode='w') as fp:
with open(output, mode='w') as fp: # pylint: disable=invalid-name
exporter.write(
template,
output=fp,
......