Source code for aquaduct.geom.cluster
# -*- coding: utf-8 -*-
# Aqua-Duct, a tool facilitating analysis of the flow of solvent molecules in molecular dynamic simulations
# Copyright (C) 2016-2018 Tomasz Magdziarz, Alicja Płuciennik, Michał Stolarczyk <info@aquaduct.pl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
This module provides functions for clusterization.
Clusterization is done by :mod:`scikit-learn` module.
"""
import numpy as np
from sklearn.cluster import Birch, DBSCAN, AffinityPropagation, KMeans, MeanShift, estimate_bandwidth
from scipy.spatial.distance import cdist
# problems with clustering methods and size of set
# DBSCAN: n > 0
# AffinityPropagation: n > 0
# KMeans: n > clusters
# MeanShift: n > 6
AVAILABLE_METHODS = ['dbscan','kmeans', 'affprop', 'meanshift', 'birch','barber']
from aquaduct.utils.helpers import Auto
from aquaduct.utils import clui
from aquaduct.traj.barber import WhereToCut
[docs]class BarberCluster(object):
'''
Wrapper class that implements *barber* clusterization.
'''
[docs] def fit(self, coords, spheres=None):
'''
:param Iterable coords: Input coordinates of points to be clustered.
:param Iterable spheres: Input spheres for each point.
'''
wtc = WhereToCut()
wtc.spheres = spheres
clouds = wtc.cloud_groups(progress=True)
# clouds to labels!
labels = np.zeros(len(spheres))
spheres_id = [s.nr for s in spheres]
for cloud_id,cloud in clouds.iteritems():
labels[[spheres_id.index(c) for c in cloud]] = cloud_id
return BarberClusterResult(labels)
[docs]def MeanShiftBandwidth(X, **kwargs):
'''
Helper function for automatic calculation of a bandwidth for MeanShift method.
:param Iterable X: Coordinates of points to be clustered.
'''
if 'bandwidth' in kwargs:
if kwargs['bandwidth'] is Auto:
bandwidth = estimate_bandwidth(np.array(X), quantile=0.5) # TODO: change it to the default value of 0.3 or use it as option?
kwargs.update({'bandwidth': bandwidth})
clui.message("Meanshift automatic bandwidth calculation: bandwidth = %f" % float(
bandwidth)) # TODO: make it properly
return kwargs
[docs]class PerformClustering(object):
'''
Helper class for clusterization.
'''
# aqeuduct clustering helper class
[docs] def __init__(self, method, **kwargs):
'''
:param object method: Class that implements cclusterization via *fit* method.
'''
self.method = method
self.method_kwargs = kwargs
self.method_results = None
self.clusters = None
[docs] def __call__(self, coords, spheres=None):
# compatibility
return self.fit(coords, spheres=spheres)
[docs] def fit(self, coords, spheres=None):
'''
:param Iterable coords: Input coordinates of points to be clustered.
:param Iterable spheres: Input spheres for each point. Optional, important only if :attr:`method` is :class:`BarberCluster`.
:return: Clusters numbers.
:rtype: list of int
'''
# spheres are used for Barber only
if len(coords) < 2:
self.clusters = self._get_noclusters(len(coords))
return self.clusters
# special cases
if self.method is BarberCluster:
method = self.method()
self.method_results = method.fit(coords, spheres)
self.clusters = map(int, self.method_results.labels_ + 1)
else:
if self.method is MeanShift:
if len(coords) < 6:
self.clusters = self._get_noclusters(len(coords))
clui.message("Number of objects %d < 6 is too small for MeanShift method. Skipping." % (len(coords)))
return self.clusters
method = self.method(**MeanShiftBandwidth(coords, **self.method_kwargs))
else:
if self.method is KMeans:
if 'n_clusters' in self.method_kwargs:
if len(coords) < self.method_kwargs['n_clusters']:
self.clusters = self._get_noclusters(len(coords))
clui.message(
"Number of objects %d < %d is too small for KMeans method. Skipping." % (
len(coords), self.method_kwargs['n_clusters']))
return self.clusters
method = self.method(**self.method_kwargs)
self.method_results = method.fit(coords)
self.clusters = map(int, self.method_results.labels_ + 1)
return self.clusters
[docs] def centers(self):
'''
:return: Centers of clusters.
'''
assert self.clusters is not None, "Perform clusterization first."
assert self.method_results is not None, "Perform clusterization first."
if hasattr(self.method_results, 'cluster_centers_'):
return self.method_results.cluster_centers_
raise NotImplementedError('Cluster centers is not implemented for %r method yet.' % self.method)