# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2017 Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
logger = logging.getLogger(__name__)
from aquaduct.utils import clui
import numpy as np
from scipy.spatial.distance import cdist
from aquaduct.geom import traces
from aquaduct.traj.inlets import Inlet, InletTypeCodes
from aquaduct.utils.helpers import is_number, lind, SmartRange
from aquaduct.utils.helpers import tupleify, listify, arrayify1
from aquaduct.utils.maths import make_default_array
########################################################################################################################
# paths/list manipulations
# following part of code comes directly from the very initial tcl/vmd implementation
# all following functions should return list
# functions union_smartr and xor_smartr were added later in python implementation to speed up calculations
[docs]def union_full(a, b):
return [aa for aa in a if aa in b]
[docs]def union_smartr(a, b):
b_ = SmartRange(b)
return [aa for aa in a if b_.isin(aa)]
[docs]def union(a, b, smartr=True):
if smartr:
return union_smartr(a, b)
return union_full(a, b)
[docs]def glue(a, b):
if a[-1] >= b[0]:
g = list(set(a + b))
g.sort()
return g
return []
[docs]@listify
def xor_full(a, b):
ab = union(a, b)
for e in glue(a, b):
if e not in ab:
yield e
[docs]@listify
def xor_smartr(a, b):
ab = SmartRange(union_smartr(a, b))
for e in glue(a, b):
if not ab.isin(e):
yield e
[docs]def xor(a, b, smartr=True):
if smartr:
return xor_smartr(a, b)
return xor_full(a, b)
[docs]def left(a, b, smartr=True):
return union(a, xor(a, b, smartr=smartr), smartr=smartr)
[docs]def right(a, b, smartr=True):
return union(b, xor(a, b, smartr=smartr), smartr=smartr)
########################################################################################################################
[docs]class PathTypesCodes():
path_in_code = 'i'
path_object_code = 'c'
path_out_code = 'o'
path_walk_code = 'w'
[docs]class GenericPathTypeCodes():
object_name = 'c'
scope_name = 's'
out_name = 'n'
[docs]class GenericPaths(object, GenericPathTypeCodes):
# object to store paths... is it required?
[docs] def __init__(self, id_of_res, name_of_res=None, min_pf=None, max_pf=None):
# id is any type of object; it is used as identifier
self.id = id_of_res
if name_of_res is not None:
self.name = name_of_res
else:
self.name = 'UNK' # FIXME: magic constant
self.__types = SmartRange()
self.__frames = SmartRange()
self.coords = []
# following is required to correct in and out paths that begin or end in scope and
# begin or end at the very begining of MD or at very end of MD
self.max_possible_frame = max_pf
self.min_possible_frame = min_pf
# info methods
@property
def types(self):
return list(self.__types.get())
@property
def frames(self):
return list(self.__frames.get())
@property
def max_frame(self):
return self.__frames.max()
@property
def min_frame(self):
return self.__frames.min()
# add methods
[docs] def add_coord(self, coord):
# store coord as numpy float 32
self.coords.append(make_default_array(coord))
[docs] def add_object(self, frame):
self.add_type(frame, self.object_name)
[docs] def add_scope(self, frame):
self.add_type(frame, self.scope_name)
[docs] def add_type(self, frame, ftype):
self.__types.append(ftype)
self.__frames.append(frame)
[docs] def _gpt(self):
# get, I'm just passing through
n = len(self.__frames)
types = self.types
begin = 0
for block in self.__frames.raw:
end = begin + block.times
# get types of this block
block_frames = list(block.get())
block_types = types[begin:end]
begin = end
# now iterate over block_types in a search of out_name
if self.out_name in block_types:
while self.out_name in block_types:
to_yield = block_frames[:block_types.index(self.out_name)]
to_yield_types = block_types[:block_types.index(self.out_name)]
if len(to_yield) > 0:
if not self.object_name in to_yield_types:
yield to_yield
block_types = block_types[block_types.index(self.out_name):]
block_frames = block_frames[block_types.index(self.out_name):]
if len(block_frames) > 0:
if len(block_frames) > 0:
if not self.object_name in block_types:
yield block_frames
[docs] def _gpo(self):
n = len(self.__frames)
types = self.types
begin = 0
for block in self.__frames.raw:
end = begin + block.times
# get types of this block
block_frames = list(block.get())
block_types = types[begin:end]
begin = end
# now iterate over block_types in a search of out_name
if self.out_name in block_types:
while self.out_name in block_types:
to_yield = block_frames[:block_types.index(self.out_name)]
to_yield_types = block_types[:block_types.index(self.out_name)]
if len(to_yield) > 0:
while to_yield_types[0] != self.object_name:
to_yield.pop(0)
to_yield_types.pop(0)
if len(to_yield) == 0:
break
if len(to_yield) > 0:
yield to_yield
block_types = block_types[block_types.index(self.out_name):]
block_frames = block_frames[block_types.index(self.out_name):]
if len(block_frames) > 0:
while block_types[0] != self.object_name:
block_frames.pop(0)
block_types.pop(0)
if len(block_frames) == 0:
break
if len(block_frames) > 0:
yield block_frames
[docs] def _gpi(self):
n = len(self.__frames)
types = self.types
begin = 0
for block in self.__frames.raw:
end = begin + block.times
# get types of this block
block_frames = list(block.get())
block_types = types[begin:end]
begin = end
# now iterate over block_types in a search of out_name
if self.out_name in block_types:
while self.out_name in block_types:
to_yield = block_frames[:block_types.index(self.out_name)]
to_yield_types = block_types[:block_types.index(self.out_name)]
if len(to_yield) > 0:
while to_yield_types[-1] != self.object_name:
to_yield.pop(-1)
to_yield_types.pop(-1)
if len(to_yield) == 0:
break
if len(to_yield) > 0:
yield to_yield
block_types = block_types[block_types.index(self.out_name):]
block_frames = block_frames[block_types.index(self.out_name):]
if len(block_frames) > 0:
while block_types[-1] != self.object_name:
block_frames.pop(-1)
block_types.pop(-1)
if len(block_frames) == 0:
break
if len(block_frames) > 0:
yield block_frames
[docs] def get_paths_in(self):
return self._gpi()
[docs] def get_paths_out(self):
return self._gpo()
[docs] def find_paths(self, fullonly=False, smartr=True):
# this looks for normal, ie containing 'core' paths
paths_out = list(self.get_paths_out())
paths_in = list(self.get_paths_in())
for path_in in paths_in:
path_out = paths_out[0]
path_glue = glue(path_in, path_out)
if len(path_glue) > 0:
path_out = paths_out.pop(0)
path_core = union(path_in, path_out, smartr=smartr)
path_in = left(path_in, path_out, smartr=smartr)
path_out = right(path_core, path_out, smartr=smartr)
else:
path_core = []
path_out = []
if len(path_in) == 0 or len(path_core) == 0 or len(path_out) == 0:
if fullonly:
continue
# TODO: make separate method/function for that
# correct for max/min possible frame
if self.max_possible_frame is not None:
if len(path_out) > 0:
if path_out[-1] >= self.max_possible_frame:
path_core += path_out
path_out = []
if self.min_possible_frame is not None:
if len(path_in) > 0:
if path_in[0] <= self.min_possible_frame:
path_core = path_in + path_core
path_in = []
yield path_in, path_core, path_out
if not fullonly:
path_in = []
path_core = []
for path_out in paths_out:
# correct for max/min possible frame
if self.max_possible_frame is not None:
if len(path_out) > 0:
if path_out[-1] >= self.max_possible_frame:
path_core += path_out
path_out = []
yield path_in, path_core, path_out
[docs] def find_paths_coords_types(self, fullonly=False):
for path in self.find_paths(fullonly=fullonly):
# yield path, self.get_single_path_coords(path), self.get_single_path_types(path)
coords, types = self.get_single_path_coords_types(path)
yield path, coords, types
for path in self._gpt():
coords, types = self.get_single_path_coords_types((path, [], []))
yield (path, [], []), coords, types
[docs] def get_single_path_coords_types(self, spath):
# returns typess for single path
# single path comprises of in,scope,out parts
# TODO: join it with get_single_path_coords
p_in, p_object, p_out = spath
frames = self.frames
types = self.types
def get_be(p_):
# if len(p_) == 1:
# quit return 0,1
return frames.index(p_[0]), frames.index(p_[-1]) + 1
in_t = []
object_t = []
out_t = []
in_c = []
object_c = []
out_c = []
if len(frames) == len(types):
# not full trajectory
# p_in
if len(p_in) > 0:
b, e = get_be(p_in)
in_t = types[b:e]
in_c = self.coords[b:e]
# p_object
if len(p_object) > 0:
b, e = get_be(p_object)
object_t = types[b:e]
object_c = self.coords[b:e]
# p_out
if len(p_out) > 0:
b, e = get_be(p_out)
out_t = types[b:e]
out_c = self.coords[b:e]
else:
# full trajectory
# p_in
if len(p_in) > 0:
b, e = p_in[0], p_in[-1] + 1
in_t = types[b:e]
in_c = self.coords[b:e]
# p_object
if len(p_object) > 0:
b, e = p_object[0], p_object[-1] + 1
object_t = types[b:e]
object_c = self.coords[b:e]
# p_out
if len(p_out) > 0:
b, e = p_out[0], p_out[-1] + 1
out_t = types[b:e]
out_c = self.coords[b:e]
return (in_c, object_c, out_c), (in_t, object_t, out_t)
[docs] def barber_with_spheres(self, spheres):
# calculate big distance matrix
# distances = cdist(self.coords, [s.center for s in spheres],metric='euclidean')
# compare with radii
# tokeep = distances > np.matrix([[s.radius for s in spheres]])
if len(spheres):
tokeep = np.argwhere((cdist(self.coords, [s.center for s in spheres], metric='euclidean') > np.matrix(
[[s.radius for s in spheres]])).all(1).A1).flatten().tolist()
# tokeep = np.argwhere(tokeep).flatten().tolist()
self.coords = lind(self.coords, tokeep)
self.__types = SmartRange(lind(self.types, tokeep))
self.__frames = SmartRange(lind(self.frames, tokeep))
# SinglePathID = namedtuple('SinglePathID', 'id nr')
[docs]class SinglePathID(object):
[docs] def __init__(self, path_id=None, nr=None, name=None):
assert path_id is not None, "path_id connot be None."
self.id = path_id
assert nr is not None, "nr connot be None."
self.nr = nr
assert name is not None, "name connot be None."
self.name = name
[docs] def __str__(self):
# by default name is not returned
return '%d:%d' % (self.id, self.nr)
[docs] def __eq__(self, other):
if isinstance(other, SinglePathID):
return self.id == other.id and self.nr == other.nr and self.name == other.name
return False
[docs]def yield_single_paths(gps, fullonly=None, progress=None, passing=None):
# iterates over gps - list of GenericPaths objects and transforms them in to SinglePath objects
nr_dict = {}
for nr, gp in enumerate(gps):
path_id = gp.id
path_name = gp.name
with clui.tictoc('Processing path %d' % path_id):
for paths, coords, types in gp.find_paths_coords_types(fullonly=fullonly):
if path_id in nr_dict:
nr_dict.update({path_id: (nr_dict[path_id] + 1)})
else:
nr_dict.update({path_id: 0})
pid = SinglePathID(path_id=path_id, nr=nr_dict[path_id], name=path_name)
if len(paths[1]) > 0:
# if everything is OK
sp = SinglePath(pid, paths, coords, types)
else:
# this is passing through
if passing:
sp = PassingPath(pid, paths[0], coords[0], types[0])
sp.has_in = sp.paths_first_in > gp.min_possible_frame
sp.has_out = sp.paths_last_out < gp.max_possible_frame
else:
continue
if progress:
yield sp, nr
else:
yield sp
[docs]class MacroMolPath(object, PathTypesCodes, InletTypeCodes):
# special class
# represents one path
empty_coords = make_default_array(np.zeros((0, 3)))
[docs] def __init__(self, path_id, paths, coords, types):
self.id = path_id
# for paths use SmartRanges and then provide methods to read them
self.__path_in, self.__path_object, self.__path_out = map(SmartRange, paths)
# similarly, do it with types
# self.path_in, self.path_object, self.path_out = paths
self.__types_in, self.__types_object, self.__types_out = map(SmartRange, types)
# self.types_in, self.types_object, self.types_out = types
self.coords_in, self.coords_object, self.coords_out = map(make_default_array, coords)
self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out = None, None, None
self.smooth_method = None
# return np.vstack([c for c in self._coords if len(c) > 0])
[docs] def is_single(self):
raise NotImplementedError("Implementation missing.")
[docs] def is_passing(self):
raise NotImplementedError("Implementation missing.")
[docs] def is_frame_in(self, frame):
return self.__path_in.isin(frame)
[docs] def is_frame_object(self, frame):
return self.__path_object.isin(frame)
[docs] def is_frame_out(self, frame):
return self.__path_out.isin(frame)
[docs] def is_frame_walk(self, frame):
return self.is_frame_in(frame) or self.is_frame_object(frame) or self.is_frame_out(frame)
@property
def path_in(self):
return list(self.__path_in.get())
@property
def path_object(self):
return list(self.__path_object.get())
@property
def path_out(self):
return list(self.__path_out.get())
@property
def types_in(self):
return list(self.__types_in.get())
@property
def types_object(self):
return list(self.__types_object.get())
@property
def types_out(self):
return list(self.__types_out.get())
# ---------------------------------------------------------------------------------------------------------------- #
@property
def coords_first_in(self):
if len(self.__path_in) > 0:
return self.coords_in[0]
@property
def paths_first_in(self):
if len(self.__path_in) > 0:
return self.path_in[0]
@property
def coords_last_out(self):
if len(self.__path_out) > 0:
return self.coords_out[-1]
@property
def paths_last_out(self):
if len(self.__path_out) > 0:
return self.path_out[-1]
@property
def coords_filo(self):
# first in and last out plus type!
return [(inlet, {0: self.incoming, 1: self.outgoing}[nr]) for nr, inlet in
enumerate((self.coords_first_in, self.coords_last_out)) if inlet is not None]
# ---------------------------------------------------------------------------------------------------------------- #
[docs] def get_inlets(self):
if self.has_in:
yield Inlet(coords=self.coords_in[0],
type=(InletTypeCodes.surface, InletTypeCodes.incoming),
reference=self.id,
frame=self.path_in[0])
yield Inlet(coords=self.coords_in[-1],
type=(InletTypeCodes.internal, InletTypeCodes.incoming),
reference=self.id,
frame=self.path_in[-1])
if self.has_out:
yield Inlet(coords=self.coords_out[0],
type=(InletTypeCodes.internal, InletTypeCodes.outgoing),
reference=self.id,
frame = self.path_out[0])
yield Inlet(coords=self.coords_out[-1],
type=(InletTypeCodes.surface, InletTypeCodes.outgoing),
reference=self.id,
frame=self.path_out[-1])
####################################################################################################################
# coords
@property
def coords(self):
return self.coords_in, self.coords_object, self.coords_out
@property
def coords_cont(self):
# returns coords as one array
return make_default_array(np.vstack([c for c in self.coords if len(c) > 0]))
####################################################################################################################
# paths
@property
def _paths(self):
return self.__path_in, self.__path_object, self.__path_out
@property
def paths(self):
return self.path_in, self.path_object, self.path_out
@property
def paths_cont(self):
pathsc = []
for p in self.paths:
pathsc += p
return pathsc
####################################################################################################################
# types
@property
def types(self):
# spath types
return ([self.path_in_code] * len(self.__path_in),
[self.path_object_code] * len(self.__path_object),
[self.path_out_code] * len(self.__path_out))
@property
def types_cont(self):
typesc = []
for t in self.types:
typesc += t
return typesc
@property
def gtypes(self):
# generic types
return self.types_in, self.types_object, self.types_out
@property
def gtypes_cont(self):
gtypesc = []
for t in self.gtypes:
gtypesc += t
return gtypesc
@property
@tupleify
def etypes(self):
# extended types
for t, g in zip(self.types, self.gtypes):
yield [''.join(t) for t in zip(t, g)]
@property
def etypes_cont(self):
return [''.join(t) for t in zip(self.types_cont, self.gtypes_cont)]
####################################################################################################################
@property
def size(self):
return sum(self.sizes)
@property
def sizes(self):
return map(len, self._paths)
@property
def begins(self):
return self.paths_cont[0]
@property
def ends(self):
return self.paths_cont[-1]
@property
def has_in(self):
return len(self.__path_in) > 0
@property
def has_object(self):
return len(self.__path_object) > 0
@property
def has_out(self):
return len(self.__path_out) > 0
####################################################################################################################
[docs] @tupleify
def get_coords(self, smooth=None):
# TODO: it is not used to get smooth coords but to get coords in general, conditionally smoothed
# if smooth is not none applies smoothing
if smooth is not None:
if smooth != self.smooth_method:
self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out = self._make_smooth_coords(
self.coords_cont, smooth)
self.smooth_method = smooth
for nr, coords in enumerate((self.smooth_coords_in, self.smooth_coords_object, self.smooth_coords_out)):
if coords is None:
yield self.coords[nr]
else:
yield coords
else:
for coords in self.coords:
yield coords
[docs] @tupleify
def _make_smooth_coords(self, coords, smooth=None):
# if smooth is not none applies smoothing
# smooth should be callable and should return an object of length equal to submitted one
# get continuous coords
if smooth:
coords_smooth = smooth(coords)
else:
coords_smooth = coords
# now lets return tupple of coords
nr = 0
for path in self._paths:
if len(path) > 0:
yield make_default_array(coords_smooth[nr:nr + len(path)])
nr += len(path)
else:
yield self.empty_coords
[docs] def get_coords_cont(self, smooth=None):
# returns coords as one array
return make_default_array(np.vstack([c for c in self.get_coords(smooth) if len(c) > 0]))
[docs] def apply_smoothing(self, smooth):
# permament change!
self.coords_in, self.coords_object, self.coords_out = self._make_smooth_coords(self.coords_cont, smooth)
####################################################################################################################
[docs] def get_distance_cont(self, smooth=None, normalize=False):
length = make_default_array(
np.hstack((np.array([0]), np.cumsum(traces.diff(self.get_coords_cont(smooth=smooth))))))
if normalize:
if is_number(normalize):
norm_factor = float(normalize)
else:
norm_factor = max(length)
length = length / norm_factor
return length
[docs] def get_distance_rev_cont(self, *args, **kwargs):
length = self.get_distance_cont(*args, **kwargs)
return length[-1] - length
[docs] @arrayify1
def get_distance_both_cont(self, *args, **kwargs):
length = self.get_distance_cont(*args, **kwargs)
return (min(n, r) for n, r in zip(length, length[-1] - length))
[docs] @arrayify1
def get_velocity_cont(self, smooth=None, normalize=False):
distance = self.get_distance_cont(smooth=smooth, normalize=normalize)
return traces.derrivative(distance)
[docs] @arrayify1
def get_acceleration_cont(self, smooth=None, normalize=False):
velocity = self.get_velocity_cont(smooth=smooth, normalize=normalize)
return traces.derrivative(velocity)
[docs]class SinglePath(MacroMolPath):
[docs] def is_single(self):
return True
[docs] def is_passing(self):
return False
[docs]class PassingPath(MacroMolPath):
[docs] def __init__(self, path_id, path, coords, types):
self.id = path_id
self.__path = SmartRange(path)
self.__types = SmartRange(types)
self.__coords = make_default_array(coords)
self.smooth_coords = None
self.smooth_method = None
self.__has_in = True
self.__has_out = True
'''
self.has_in = True
self.has_out = True
'''
@property
def has_in(self):
return self.__has_in
@has_in.setter
def has_in(self, value):
self.__has_in = value
@property
def has_out(self):
return self.__has_out
@has_out.setter
def has_out(self, value):
self.__has_out = value
[docs] def is_single(self):
return False
[docs] def is_passing(self):
return True
[docs] def is_frame_walk(self, frame):
return self.__path.isin(frame)
@property
def types(self):
return ([self.path_walk_code] * self.size,)
@property
def gtypes(self):
# generic types
return (self.__types.get(),)
@property
def sizes(self):
return 0, len(self.__path), 0
@property
def _paths(self):
return (self.__path,)
@property
def coords(self):
return (self.__coords,)
@property
def path(self):
return list(self.__path.get())
@property
def paths(self):
return (self.path,)
@property
def coords_first_in(self):
return self.__coords[0]
@property
def paths_first_in(self):
return self.path[0]
@property
def coords_last_out(self):
return self.__coords[-1]
@property
def paths_last_out(self):
return self.path[-1]
[docs] @tupleify
def get_coords(self, smooth=None):
# TODO: it is not used to get smooth coords but to get coords in general, conditionally smoothed
# if smooth is not none applies smoothing
if smooth is not None:
if smooth != self.smooth_method:
self.smooth_coords = self._make_smooth_coords(self.__coords, smooth)[0]
self.smooth_method = smooth
for nr, coords in enumerate((self.smooth_coords,)):
if coords is None:
yield self.coords[nr]
else:
yield coords
else:
for coords in self.coords:
yield coords
[docs] def get_inlets(self):
# Lets assume that if max/min_possible_frame is there is no inlet
# how to check it?
if self.has_in:
yield Inlet(coords=self.__coords[0],
type=(InletTypeCodes.surface, InletTypeCodes.incoming),
reference=self.id,
frame=self.paths_first_in)
if self.has_out:
yield Inlet(coords=self.__coords[-1],
type=(InletTypeCodes.surface, InletTypeCodes.outgoing),
reference=self.id,
frame=self.paths_last_out)
####################################################################################################################
[docs]class MasterPath(MacroMolPath):
[docs] def __init__(self, sp):
super(MasterPath, self).__init__(sp.id, sp.paths, sp.coords, sp.gtypes)
self.width_cont = None
[docs] def add_width(self, width):
assert len(width) == self.size
self.width_cont = width