Monday, 26 September 2011

I wanted to partition the given set(S) into k-subsets,
such that every element of subset(i-1) is lesser than every element of subset(i), where i belongs to {1,2,....k}.

Since those subsets may not be sorted, So Is it possible to do this in O(nlogn)?

I have implemented it with Random QuickSort and Binary Tree. This concept I am trying to apply to each block of HDFS DataNode.

import java.util.*;

public class QuickSortDecomposition {

public static void main(String[] args) {
/* Data to be sorted */
List<Integer> data = createRandomData();

                Collections.shuffle(data);

List<Integer> sorted = quickSort(data, binaryTree);

System.out.println(" No of partitions : " + bucketMap.size());
System.out.println(" Buckets:  ");
for (Map.Entry<Integer, ArrayList<Integer>> entry : bucketMap
.entrySet()) {
if (isLeafNodeId(binaryTree, entry.getKey())) {
System.out.println(" Bucket ID: " + entry.getKey());
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;
for (int i = 0; i < entry.getValue().size(); ++i) {
if (min > entry.getValue().get(i))
min = entry.getValue().get(i);
if (max < entry.getValue().get(i))
max = entry.getValue().get(i);
}
System.out.println(entry.getValue());
System.out.println("min: " + min + "   max: " + max);
}
}

System.out.println(" Binary Tree Size:   " + getSize(binaryTree));
System.out.println(" Binary Tree Leaf Nodes:   "
+ getLeafCount(binaryTree));

/* Print sorted data to the standard output */
// System.out.println("soretd: " + sorted);
}

private static final Random rand = new Random();
private static int k = 4;
private static int bucket_id = 1;
private static Map<Integer, ArrayList<Integer>> bucketMap = new TreeMap<Integer, ArrayList<Integer>>();
private final static int total_no_ele = 100;
static class Node {
Node left;
Node Right;
int value;
}
private static Node binaryTree;
static {
binaryTree = new Node();
binaryTree.left = null;
binaryTree.Right = null;
binaryTree.value = bucket_id;
}

private static int getLeafCount(Node root) {
if (root == null)
return 0;
if (root.left == null && root.Right == null)
return 1;
else
return getLeafCount(root.left) + getLeafCount(root.Right);
}

private static int getSize(Node root) {
if (root == null)
return (0);
else {
return (getSize(root.left) + 1 + getSize(root.Right));
}
}
private static boolean isLeafNodeId(Node root, int value) {
if(root == null)
return false;
if(root.left == null && root.Right == null && root.value == value)
return true;
return isLeafNodeId(root.left, value)|| isLeafNodeId(root.Right, value);
}

/* Add data to be sorted to the list */
public static List<Integer> createRandomData() {
// int max = 1000000;
int max = 500;
List<Integer> list = new ArrayList<Integer>();
for (int i = 0; i <= total_no_ele; i++) {
/*
* We can add any type that implements the Comparable interface
*/
list.add(Integer.valueOf(rand.nextInt(max)));
}
return list;
}

public static <E extends Comparable<? super E>> List<E> quickSort(
List<E> data, Node root) {
List<E> sorted = new ArrayList<E>();
rQuickSort(data, sorted, root);
return sorted;
}

public static <E extends Comparable<? super E>> void rQuickSort(
List<E> data, List<E> sorted, Node root) {
// if (data.size() == 1) {
if (data.size() == 0) {
return;
}
if (data.size() <= (total_no_ele/k)) {
sorted.add(data.iterator().next());
return;
}


if (getLeafCount(binaryTree) == k) {
return;
}

/* choose the pivot randomly */
int pivot = rand.nextInt(data.size());
E pivotI = data.get(pivot);
List<E> fatPivot = new ArrayList<E>();
List<E> left = new ArrayList<E>();
List<E> right = new ArrayList<E>();
/* partition data */
for (E next : data) {
int compare = pivotI.compareTo(next);
if (compare <= 0) {
right.add(next);
} else if (compare > 0) {
left.add(next);
} else {
fatPivot.add(next);
}
}
root.left = new Node();
root.Right = new Node();
root.left.value = bucket_id+1;
root.Right.value = bucket_id+2;
bucketMap.put(++bucket_id, (ArrayList) left);
bucketMap.put(++bucket_id, (ArrayList) right);

rQuickSort(left, sorted, root.left);
sorted.addAll(fatPivot);
rQuickSort(right, sorted, root.Right);
}
}