import random import sys from typing import List from kmeans.clustering.cluster import Cluster from kmeans.clustering.point import Point from kmeans.clustering.geometry import dist def mean_movement(clusters: List[Cluster]) -> float: """ Determines the maximum mean movement between two time periods. It will calculate the euclidean distance between each point and it's corresponding point in the other list (by finding the point using the point equality function built onto the Point class) and then return the distance moved. """ highest_movement = 0 for cluster in clusters: if cluster.mean_moved() > highest_movement: highest_movement = cluster.mean_moved() return highest_movement def unweighted_k_means(points: List[Point], k: int, d: float = 0.001) -> List[Cluster]: """ Runs Lloyd's Algorithm for k-means clustering without weights. @param points The list of points to cluster. @param k The number of clusters. @param d The threshold distance where we will consider the means converged. """ # Our means are initialized to points in the set randomly but # are kept separately from the points themselves. means = random.sample(points, k) clusters = [] for i, mean in enumerate(means): clusters.append(Cluster(mean, [], None)) mean_needs_moving = True # Refer to the mean_movement function. It simply returns the largest # mean movement of all clusters. Once the movement of all clusters is # lower than `d` we can consider the series converged and return. while mean_needs_moving: # Compare each point to every cluster's centroid and calculate the # distance to it. Assign to the nearest cluster. for point in points: best_dist = sys.maxsize best_cluster = None for cluster in clusters: d = dist(point, cluster.mean) if d < best_dist: best_dist = d best_cluster = cluster if best_cluster is None: raise ValueError( f'Failed to assign cluster to point {point}') if point.cluster is not None: point.cluster.remove_point(point) best_cluster.add_point(point) for cluster in clusters: # Update the mean with the new points if cluster.points is not None: xs = [p.x for p in cluster.points] ys = [p.y for p in cluster.points] # Averaging the xs and ys will give us the mean point # of our cluster. new_x = sum(xs) / len(xs) new_y = sum(ys) / len(ys) new_mean = Point(new_x, new_y) cluster.update_mean(new_mean) mean_needs_moving = mean_movement(clusters) > d return clusters