標籤

2013年7月31日 星期三

陣列-sparse matrix的generation

// sparse matrix structure
typedef struct spm {
   int row; // row position
   int col; // column position
   int val; // value
}spm;
// generate a sparse matrix from a
void generate_spm(int *a, spm *s)
{
   // row, column index and counting
   // for nonzero elements
   int r, c, count = 0;
   // initialize first row
   s[0].row = M;
   s[0].col = N;

   for (r = 0; r < M; r++)
      for (c = 0; c < N; c++)
         // if there is an nonzero element
         if (a[r*N + c] != 0) {
            count++;
            s[count].row = r;
            s[count].col = c;
            s[count].val = a[r*N + c];
         }
   s[0].val = count;
}
int main()
{
   int i = 0;
   printf("Enter the matrix elements:\n");
   unsigned size = 0;
   int val;
   while (scanf("%d", &val) != EOF)
      put_to_tail(val);
   size = show_list_and_size();

   int *a = (int*)malloc(sizeof(int)*size);
   list_to_array(a);

   // input M
   FILE *file_in_M;
   file_in_M = fopen("M_size", "r");
   fscanf(file_in_M, "%d", &M);
   if (size % M != 0) {
      fprintf(stderr,"size % M != 0\n");
      exit(1);
   }
   fclose(file_in_M);
   N = size / M;


   int count_non_zero = 0;
   // counting nonzero elements of a
   for (i = 0; i < size; ++i)
      if (a[i] != 0)
         count_non_zero++;
   // allocating s with size of nonzero elements+1
   spm *s = (spm*) malloc(sizeof(spm)*(count_non_zero+1));

   // generate sparse matrix a to s
   generate_spm(a, s);
   //print out the result
   printf("   row  col  val\n---------------\n");
   for (i = 0; i <= count_non_zero; ++i)
      printf("%5i%5i%5i\n", s[i].row, s[i].col, s[i].val);
   return 0;
}   
首先這程式要能work,就先要把sorting的準備的這些function放進去。

這是陣列的一個應用。把一個sparse matrix(也就是matrix的elements有非常多都是0)轉成一個更精簡的matrix,使得:
1. 比較省記憶體空間
2. 處理矩陣運算速度能夠更快

有使用matlab的人如果有研究過的話,應該會曉得這軟體在處理sparse matrix是採用精簡matrix。你可以把它看成是一個對應表格。首先這種表格第一行就是這個矩陣的相關資料。如果這矩陣是M*N的大小,並且有100個非零項的話,那麼這個精簡表格的第一行會是如此的表示:
M   N   100。
接下來的每一行都會是根據非零項的相關位置來對應其值:
原矩陣:
0  3  0  0  1
0  2  1  0  3
7  0  9  0  1
表格矩陣:
3  5  8
0  1  3
0  4  1
1  1  2
1  2  1
1  4  3
2  0  7
2  2  9
2  4  1

看起來好像似乎資料量變多,但其實真正的sparse matrix的非零項會是更多的,假設有一個1000* 1000的matrix,非零項只有10,那麼資料型態若是int的話,原本的矩陣所需要的記憶體空間量將會是sizeof(int) * 1000*1000 但使用這種精簡矩陣的空間量將會大幅減少為sizeof(int)*(10+1) 減少了機乎是100,000倍的空間量。flop次數也更不用講了。

因此先從大方向的main開始講解其架構:

put_to_tail會從io那邊得來的資料放入link list中。接著show_list_and_size會show出link list的資料,來確認放入link list的資料是對的,並同時會回傳資料的筆數。而以此資料的筆數我們就能夠memory allocate資料的大小,並使用list_to_array把link list放入array a以內。

因為筆者的研究本身都是巨量資料,因此是需要memory allocate在記憶體中的,所以這種方式是用一維的陣列來儲存矩陣的資料。因此需要矩陣的input的size M*N。所以開了檔案,input進去M之後check看看這是不是合法的矩陣便能夠了解大小N。

而在generate_spm內我們把矩陣a把它弄成只收集非零項的資料,但即使如此我們還是必須要事先知道大小,因此我們需要一個變數count_non_zero去先了解矩陣有幾個非零項,然後才去mem. allocate我們要的精簡後的matrix s。

把sparse matrix a轉成s後就把s的elements印出來確認無誤。以上就是主要的操作流程。

所以我們需要自訂資料型態spm(sparse matrix的簡寫),有三個elements:
// sparse matrix structure
typedef struct spm {
   int row; // row position
   int col; // column position
   int val; // value
}spm;
這三個資料就會是我們精簡表格每一個row各自代表的資料,你可以把這矩陣想像成類似坐標那樣,我們需要一個點的資料就必須知道他的坐標,row就像是y坐標,而col就類似x坐標,而val就是這個點的值。這個資料型態將會儲存非0項的位置和其值。

接著generate_spm就將會把sparse matrx(一個1D的array)轉換成表格s。首先這個表格的第一行資料將會是這個sparse matrix的M,N,以及非零項的個數,於是我們就先輸入M和N:

   s[0].row = M;
   s[0].col = N;
接著當然我們也可以從外面的count_non_zero來輸入s[0].val。只是目前這邊沒有這樣子做。

然後r,c迴圈去做a的每一個elements的尋訪,找出非零項之後各自輸入到s上:
   for (r = 0; r < M; r++)
      for (c = 0; c < N; c++)
         // if there is an nonzero element
         if (a[r*N + c] != 0) {
            count++;
            s[count].row = r;
            s[count].col = c;
            s[count].val = a[r*N + c];
         }
最後輸入counting的大小,精簡矩陣s就這樣轉換完成了。

2013年7月29日 星期一

quick sorting的研究和改進

int divide_and_swap_with_d(int *a, int left, int right, long int d)
{
   // regart the first element as the pivot
   int pivot = a[left];
   int l, r;
   l = left;
   r = right + d;
   do {
      // find out l which the element is bigger/equal than
      // pivot from left hand side
      //do l++; while (l < r &&a[l] < pivot);
      do l += d; while (l<=right && a[l] < pivot);
      // find out r which the element is less/equal than
      // pivot from left hand side
      do r -= d; while (r>=left && a[r] > pivot);
      if (l < r) swap (a+l, a+r);
   } while (l < r);
   // swap both
   a[left] = a[r];
   a[r] = pivot;
   return r;
}
void quick_sort_with_d(int *a, int left, int right, long int d)
{

   int pivot;
   if (right > left) {
      // return the pivot in [left:right]
      pivot = divide_and_swap_with_d(a, left, right, d);
      // recursive sorting between [left:pivot-1]
      quick_sort_with_d(a, left, pivot - d, d);
      // recursive sorting between [pivot+1:right]
      quick_sort_with_d(a, pivot + d, right, d);
   }
}
// shell sort with quick sort kernel
void shell_plus_quick_sort(int *a, int size, int q, int p)
{
   int start, end;
   long int d = 1;
   // find out d_{n+1} < size
   do {
      d = q*d + p;
   } while (d < size);
   d = (d - p)/q;
   d = (d - p)/q;

   // use quick sort until division distance > 1
   while (d > 1) {
      end = size - (size%d);
      for (start = 0; start < d; ++start)
         quick_sort_with_d(a, start, start + end - d, d);
      d = (d - p)/q;
   }
   // normal quick sorting
   quick_sort(a, 0, size - 1);

}
雖說這是一個嘗試的改進呢,但事實上是一次失敗的改進。

這次的改進其實是使用shell sorting,但做sorting的kernel不是insertion,而是使用quick sorting。測量了速度來說,慘不忍睹阿,跟一般的quick又或是shell來比的話,真的速度太慢了。有興趣的人可以自行測看看速度。因為結果太差了,所以把這次的嘗試的失敗的改良放上來討論為何會失敗。

事實上寫到一半的時候大概知道這個跑起來不會比較快而且會慢很多,原因在於它和當時改進insertion使用link list的原因是一樣的,同樣是cache miss的問題。

最主要就是慢在這個kernel:divide_and_swap_with_d,是它造成有這種cache miss的問題,原因在於每次在比較a[l]或a[r]的時候,由於並非是連續的記憶體分佈,造成很有可能抓資料的時候,資料並沒有在cache上面。而必須要通過主記憶體去拿。這樣子一來一往就慢了,簡而言之就是遠水救不了近火。

那麼為什麼使用insertion sorting的kernel反而比較快?明明不是quick sorting本身比insertion sorting快嘛?怎麼可能會使用慢的kernel會比較快呢?

原因在於我們在做insertion sorting的kernel的時候,使用了一個tricky的方法,它並非如所宣稱的順序那樣sorting...比如說d=3的case,他會作這樣的sorting順序:
0 3 1 4 2 5 3 6 4 7 5 8.....
但以演算法宣稱的方式而言,理論上應該是要按照這樣的順序:
0 3 6 9 12.......    1 4 7 10 13.....    2 5 8 11 12.....
而事實上這兩種方式的結果將會是一模一樣的,只是先算後算的差別而已。
但更改這種方式將會造成速度上的差異,原因就在於cache memory一次都抓一把東西進去,以第一個方式去sorting的話,cache首先也許會抓個16個data進去,也就是在cache裡面,0~15這個範圍內的排序都在裡面做,不需要一直往記憶體那邊拿,但是如果採用第二個方法,每排6個data就得要再次的去記憶體那邊抓資料,會比較沒有效率。當然cache的實作我不是非常了解,他會一次抓幾個data不是很確定,但是可想而知的光是效率部分就會有差。


為了證實我的推論,於是二話不說改寫了下傳統的shell sorting,把他寫成第二個方法並與第一個方法比看看速度,以下是測試結果:
timing for shell 0(d=3*d+1): 13.000000
timing for shell 1 (d=3*d+1): 16.000000

shell 0是原本的寫法,而shell 1是使用演算法宣稱的方式,的確是比較慢,但是速度上差異不明顯,在此使用了20000000大小的array,random的range也是20000000。

以下是改寫成為演算法宣稱的source code:

void shell_sort_1(int *a, int size, int q, int p)
{
   // the element that want to insert in
   unsigned sofar;
   // previous element index before sofar
   int prev;
   // saving the value of a[sofar]
   int a_sofar;
   int start;
   long int d = 1;
   // find out d_{n+1} < size
   do {
      d = q*d + p;
   } while (d < size);
   d = (d - p)/q;
   d = (d - p)/q;

   // use insertion sort until division distance = 1
   do {
      for (start = 0; start < d; ++ start)
         for (sofar = start + d; sofar < size; sofar += d) {
            a_sofar = a[sofar];
            prev = sofar - d;
            while (prev >= 0 && a_sofar < a[prev]) {
               a[prev + d] = a[prev];
               prev -= d;
            }
            a[prev + d] = a_sofar;
         }
      d = (d - p)/q;
   } while (d >= 1);
}
有興趣者請自行編譯研究。

2013年7月23日 星期二

quick sorting

int divide_and_swap(int *a, int left, int right)
{
   // regart the first element as the pivot
   int pivot = a[left];
   int l, r;
   l = left;
   r = right + 1;
   do {
      // find out l which the element is bigger/equal than
      // pivot from left hand side
      do l++; while (a[l] < pivot);
      // find out r which the element is less/equal than
      // pivot from right hand side
      do r--; while (a[r] > pivot);
      if (l < r) swap (a+l, a+r);
   } while (l < r);
   // swap both
   a[left] = a[r];
   a[r] = pivot;
   return r;
}
// quick sorting
void quick_sort(int *a, int left, int right)
{

   int pivot;
   if (right > left) {
      // return the pivot in [left:right]
      pivot = divide_and_swap(a, left, right);
      // recursive sorting between [left:pivot-1]
      quick_sort(a, left, pivot - 1);
      // recursive sorting between [pivot+1:right]
      quick_sort(a, pivot + 1, right);
   }
}
主要有兩個function,其實是可以寫成同一個function省記憶體。因為這邊這個quick sorting是很浪費記憶體空間的sorting,如果把一個function拆成兩個會更浪費記憶體。但為了教學和理解方便將它拆成兩部分。如果對於概念已熟稔,就把它們寫成同一個function來實作。

這個sorting在寫的時候根據經驗是非常容易寫錯的,雖然概念上很簡單,但我實作我的一本參考書把整個code一模一樣打上去的時候是不能run的。於是最後參考了Horowitz的資料結構而寫成。經過測試是沒問題的。

quick sorting的理論其實很簡單,就是把第一個element當做是基準值,接著從左邊開始往右找到一個大於等於基準值的第一個數,然後從右邊找到小於等於基準值的第一個數,接著把它們swap。接著繼續找直到從左邊找的和從右邊找的位置相交為止。然後把基準值換到最後從右邊找不到比它大的數的那個位置作swap。因此左邊的數就會比基準值小(或等於),右邊的數就會比基準值大。接著再用同樣的方式從左邊到基準值的這一段做quick sorting,然後從基準直到右邊也做同樣的quick sorting。

以排隊的例子來說,簡單來講就是把隊伍中第一個抽出來,然後讓比他矮的排他左邊,比他高的排他右邊。排的方式就是從左邊開始找出比他高的,然後和從右邊開始找出比他矮的交換位置。一直持續的找找到位置是相交的為止。 最後就把那個原本的第一位排到那個和它自己一樣大又或者是比他小一點點的那個位置。

但是呢這時候雖然這個人的左邊都是比他矮的,但是就是未排序,因此用同樣的邏輯去排他左邊的那一堆人。右邊的也是同樣的方式。

因此最後將會每個人的右邊將會比他高,左邊的就會比他矮。這就是sorting的樣貌。

   if (right > left) {
      // return the pivot in [left:right]
      pivot = divide_and_swap(a, left, right);
      // recursive sorting between [left:pivot-1]
      quick_sort(a, left, pivot - 1);
      // recursive sorting between [pivot+1:right]
      quick_sort(a, pivot + 1, right);
   }
首先是這個程式的邏輯。if所包著的,就是確保我們在排的時候裡面還是在相交之前。而pivot = divide_and_swap(a, left, right);這一行在做的就是我們從左區間left到右區間right中做排列,抽出left這個位置的value當做基準值pivot,然後讓左邊的值都比pivot小,讓右邊的值都比pivot大,最後再返回pivot的位置。

然後用同樣的邏輯去排pivot左邊那群人,以及右邊那群人就是這邊程式的邏輯。有點像是二分搜尋法的感覺,只是多了左右交換使得左邊比較小而右邊比較大。
因此在divide_and_swap內:
   do {
      // find out l which the element is bigger/equal than
      // pivot from left hand side
      do l++; while (a[l] < pivot);
      // find out r which the element is less/equal than
      // pivot from right hand side
      do r--; while (a[r] > pivot);
      if (l < r) swap (a+l, a+r);
   } while (l < r);
從左邊找出l(L,不是123的1)這個位置,使得它的值比pivot大(或等於)。
用同用的邏輯從右邊找到r,其值比pivot小(或等於)。
如果這兩個位置沒相交的話,就讓它們兩個交換,因此如果最後的pivot放到這個相交點的話:

   a[left] = a[r];
   a[r] = pivot;
那麼經過do while迴圈後pivot左邊的那群value將會小於等於pivot,右邊的將會大於等於pivot。

2013年7月20日 星期六

shell sorting的研究和改進

理論上來說shell 是insertion的改進版。但究竟能產生多少的差別呢?
首先我使用100,000個elements的random array,range是100,000。

timing for bubble: 40.000000
timing for big one: 12.000000
timing for insertion: 9.000000
timing for insertion(with link list): 48.000000
timing for shell: 0.000000

似乎看不到有什麼差別阿,於是這次就使用400,000個elements,random range400,000。
timing for bubble: 592.000000
timing for big one: 200.000000
timing for insertion: 120.000000
timing for insertion(with link list): 1212.000000
timing for shell: 0.000000

唔,當其他的演算法已經開始以分鐘來計時的時候,我們仍然看不到shell的底線到底在哪裡。而其他的演算法果然是大約是以O(N^2)來增加時間複雜度。當然比較麻煩的就是insertion with link list這種很吃記憶體和cache的memory bandwidth就已經不是用單純的演算法來估計了。還必須加上這種memory transferring bound的問題。

因此一口氣催下去看看shell的底線在哪吧!增加個20倍:8000,000個elements, range 8000,000:
timing for shell: 4.000000

終於是看到它的車尾燈了。其他的就沒有跑測試了,因為照這樣下來的話我要至少跑200個小時以上才能跑完這個測試。

所以用時間複雜度來單純估計insertion,timing大約會是120*20^2  =48000
speed up大約是:48000/4=12000倍....差太多了。

至於shell有沒有可以改進的方法。也許有,比如他的kernel不要用insertion,可以改用其他的。但目前為止最快的方式還是insertion。如果可以的話,等到介紹quick sorting的時候試試看把quick sorting當kernel來嘗試shell sorting。

另一種改進的方法就是不斷的去試d的增長值。目前是d=3*d+1,如果可以的話就試試其他的方式。比如d=3*d+2?
timing for shell: 5

看起來似乎差不多,而且我用的timer很爛,為了減低誤差所以把資料量提高至50000000個:
timing for shell(3*d +1):36
timing for shell(3*d +2): 47

的確目前看起來d=3*d+1是最快的方式 。
接下來再試個d=7*d+3
結果測試:
timing for shell:??
好久...

結果用20個elements一樣做不出來,各位知道為什麼呢?在code裡面把d的value印出來會發現d的value竟然是錯的!因為對於int的value來說,如果d是負的,在記憶體的第一個element會是標示為1,但是現在我們用的d是unsigned,因此對unsigned來說這個1不是負的,而是一個很大的數。因此如果你把它印出來就會發現這個值太大了!

於是把code裡面的unsigned d換成是long int d 再跑一次d=7*d+3
timing for shell(7*d+3):67
好像更久了。

不過呢,這樣做似乎是沒什麼效率,彷彿只是隨便撈兩三個數字來去證明3*d+1是最快速的方式,說服力實在低弱的很。不如就系統化的做下來吧!把程式稍微改寫一下,用迴圈不斷的輸入參數,以下是測試結果:

timing for shell(d=7*d+1): 32.000000
timing for shell(d=7*d+2): 31.000000
timing for shell(d=7*d+3): 32.000000
timing for shell(d=7*d+4): 33.000000
timing for shell(d=7*d+5): 35.000000
timing for shell(d=7*d+6): 33.000000
timing for shell(d=6*d+1): 27.000000
timing for shell(d=6*d+2): 231.000000
timing for shell(d=6*d+3): 217.000000
timing for shell(d=6*d+4): 233.000000
timing for shell(d=6*d+5): 29.000000
timing for shell(d=5*d+1): 26.000000
timing for shell(d=5*d+2): 27.000000
timing for shell(d=5*d+3): 29.000000
timing for shell(d=5*d+4): 28.000000
timing for shell(d=4*d+1): 27.000000
timing for shell(d=4*d+2): 233.000000
timing for shell(d=4*d+3): 29.000000
timing for shell(d=3*d+1): 24.000000
timing for shell(d=3*d+2): 22.000000
timing for shell(d=2*d+1): 26.000000

看起來真奇怪,有某些數時間花的特別的長,大約接近十倍之多。反覆測那些數果然是沒有錯的,的確真的就是需要花比其他的時間還長。怎麼會這樣,如果看規律的話,可以看見那些數有個重要的特點那就是乘數和加數除了加數是一以外,它們會是倍數關係。,所以會有redundant的排列導致大的d和小的d是倍數關係,等於其實大的d是沒有必要的排列。多餘的排列。

舉例:d=6*d+3 當d=9以及d=2073的時候他們剛好是可以整除的倍數關係。那麼當我們排到d=9的時候事實上就等於d=2073也順便排一次了。那麼等於2073那次其實是多餘的排列。

其他時間比較多的數也因此可以做同樣的類推。應該算是合理的。至於為什麼會差十倍之多而不是比較少其實我們光看shell和insertion就可以了解他們的速度是差了十萬八千里。因此如果d少了一兩層的排列會少個十倍的速度我想應該是合理的,否則就不能解釋shell和insertion有時間距大的差異。因此像這種乘數與加數有倍數關係的case會發現d有一些是多餘的sorting就會差個十倍應該會是合理的。如果我們把array的element數再多個一千倍我預估d至少會少個一層,那麼這之間這些關係速度就不會只差十倍了,也許是五十倍又或是一百倍。但礙於時間沒有這麼多,因此只剩預估在這。如果有人有興趣的可以自己去跑看看是否會如我的預料發展。

因此測試我的理論是否是正確的就是測看看這種case: d=5*d 以及比較d=5*d+1, d= 5*d+2....的速度上的比較。我預估d=5*d會是相當劇烈的慢的。但由於我相當的相信我的理論是對的所以就懶的測試了。有興趣的人可以去玩玩看!

2013年7月17日 星期三

shell sorting

// shell sort
void shell_sort(int *a, int size)
{
   // the element that want to insert in
   unsigned sofar;
   // previous element index before sofar
   int prev;
   // saving the value of a[sofar]
   int a_sofar;
   unsigned d = 1;
   // find out d_{n+1} < size
   do {
      d = 3*d + 1;
   } while (d < size);
   d = (d - 1)/3;
   d = (d - 1)/3;

   // use insertion sort until division distance = 1
   do {
      for (sofar = d; sofar < size; ++sofar) {
         // save a[sofar]
         a_sofar = a[sofar];
         // previous index start from sofar - 1
         prev = sofar - d;
         // while previous element is larger then
         // the value that we want to insert, then
         // push it to the right hand side
         while (prev >= 0 && a[prev] > a_sofar) {
            a[prev + d] = a[prev];
            prev -= d;
         }
         // then a[prev] is small or equal than a_sofar
         // so we can insert a[sofar] to this position
         a[prev + d] = a_sofar;
      }/* sofar */
      d = (d - 1)/3;
   } while (d >= 1);

}
以上是Shell sorting的程式碼,基本上他算是insertion sorting的延伸。你可以想像一列身高沒有排好的隊伍,原先的insertion sorting就是從頭開始一個一個的去排,把後面那個依據身高來往前
插入放到正確的身高位置,並以同樣的邏輯如此的排到最後一位。

而insertion sorting的缺點就是如同bubble sorting一樣,資料量越大的話那麼會有兩個要素就會跟著變大1.比較次數,2.資料搬移次數。只是insertion 比傳統的bubble sorting好一點的地方就是insertion的期望比較次數和資料搬移次數會是當下已排序量的一半。因此bubble sorting比較搬移次數如果是大約O(N^2)那麼insertion會是這個期望值的一半(雖然如此但成長倍數仍然是N^2)。

那麼如果本身資料已經有些排序的話,基本上insertion比較次數會比較少。因此藉由這樣的想法而發展出所謂的shell sorting。

他的邏輯很簡單。首先我給它一個number d 假設他是10好了。具體的作法如下:
1.從這隊伍中從第一位開始數每10個人一數我就叫人出來。如果整體有1000個人我這次就抽了一百個人出來。
2.對這十個人做insertion排序
3.接下來把這100個已排過順序人放回隊伍之後,接下來從第二位開始數,也是每10個人一屬然後一樣做insertion排序
4.直到從第9位開始數已經排序完之後,我們就縮短d,比如8 或7 或其他更小的數。
5.d越來越小直到d=1的時候就等於是普通的insertion sorting了。

而有人做演算法實驗發現如果d_{i+1}= 3*d_i +1的這種方式來算組距,當一開始d_n如果滿足d_{n+2} <N的時候,大量資料的情況之下是最理想的,時間複雜度將會到O(N^1.25)

因此以下是程式碼的對應:

前面的sofar,a_sofar,prev和Insertion sorting的notation用的是一樣的,sofar意思是目前排序的位置,也就是待排序數。a_sofar就是a[sofar],prev就是在sofar之前的index。


   unsigned d = 1;
   // find out d_{n+1} < size
   do {
      d = 3*d + 1;
   } while (d < size);
   d = (d - 1)/3;
   d = (d - 1)/3;
在這邊一開始當然設的組距d是1,因為畢竟最後就是要做insertion sorting,而按照最佳的測試。d=3*d+1,當d_n滿足d_{n+2}<N的時候就是最好狀況。因此這邊首先d=1是因為最後要做的是insertion,而中間那個do while回圈就是要先找到d_n<N的狀況。接著往回回溯兩次就會是這個d_{n+2}<N了。


      for (sofar = d; sofar < size; ++sofar) {
         // save a[sofar]
         a_sofar = a[sofar];
         // previous index start from sofar - 1
         prev = sofar - d;
         // while previous element is larger then
         // the value that we want to insert, then
         // push it to the right hand side
         while (prev >= 0 && a[prev] > a_sofar) {
            a[prev + d] = a[prev];
            prev -= d;
         }
         // then a[prev] is small or equal than a_sofar
         // so we can insert a[sofar] to this position
         a[prev + d] = a_sofar;
      }/* sofar */
這個很熟悉嗎?是的沒錯,這就是普通的insertion sorting。只是它是每間隔d開始做一次sorting。首先為了教學方便我們先假設d=10。於是我們先從隊伍中第十個人開始與前面每隔10個人就抽出來的人開始做insertion排序。當然在這邊就只有第一個人跟他做insertion排序。這兩個人排好之後就換第11個!注意這邊的邏輯會有點不太一樣。理論上來說是從隊伍的第20個開始跟原先這兩個排序好的傢伙再次排序,接著是第30,40,50... 最後才是從隊伍的第十一個開始與第二個人開始作排序,接著是第二十一個與第十一個,第二個人排序。如此下來的。但這邊的方式是不太一樣的。但基本上演算方式其實是和我們一開始的設計是一樣的,只是這邊的順序就是有經過調整。但是最後的排序方式都會是和我們的策略是一模一樣的。

因此在某個組距d之下都已經排好的話那麼就開始調整d的大小。

   do {
      d = (d - 1)/3;
   } while (d >= 1);
慢慢的縮小組距d,直到最後d=1做普通的一般insertion sorting為止。這就是shell sorting。基本上就是insertion sorting,只是是有組距版本的。

2013年7月15日 星期一

insertion sort 的研究和改進

Insertion的方式有個缺點,就是當你排序到某個程度的時候,資料需要大量的往後搬移。以數學上的機率來說排序量大到某個程度的時候,搬移的次數將會是你目前排序量的一半。

比如說一個random的待排序array,目前已經排了第100個了,那麼這第101個數在機率上在這前100個值內的分布理論上來說是均勻的,因此你需要排序的次數的期望值就會是一半:
<次數>=sum_某數 (某數次數*某數機率)

當然這是很naive的推測,重點在於只是為了闡述搬移次數是不容小覷的!

除了搬移次數以外,還有個另外的問題就是比對大小次數。那個while迴圈內是不斷的判斷prev這個位置的值是否比欲排序值還大又或是小,在cpu的層次來看,array的element事實上不是放在register的,而是放在cache甚至更有可能的是放在距離cpu更遠的memory上,真正放在register的是prev和那個欲比較的值。因此這個while迴圈等於是不斷的從memory上拿東西來做比較。但事實上a[j]以及a[j+1]他們的值是很接近的,尤其是排序到某個大小的程度之後是如此。如果每次都一個一個的去比較a_sofar的話是沒有太大意義,浪費一堆時間!

而解決第一個問題的方式很簡單,就是使用link list的資料結構,這樣就不需要在那一個一個搬移資料,直接插入值就行了。 
而我們由於在這篇文章: 排序前的準備 已經老早就有現成的link list可以用了,所以不用在那煩惱怎麼把array轉成link list

而解決第二個問題就使用切分來去快速找出符合大小的位置,也就是二元搜尋法。

以下是使用Link list的方法來解決搬移次數的問題:
void insertion_sort_use_LL(int *a)
{
   data_node *sofar, *rush, *b4_sofar, *smaller;
   int val_sofar;
  // previous node of sofar
   b4_sofar = head;
   // begin sofar at the 2nd node
   sofar = head->next;

   // run over all elements
   while (sofar != NULL) {
      val_sofar = sofar->data;
      // compare val_sofar from head to sofar
      smaller = rush = head;
      while (rush != sofar && rush->data <= val_sofar) {
         // if data  of rush is smaller than val_sofar,
         // move to the next element
         smaller = rush;
         rush = rush->next;
      }
      // if sofar is in order, then go to next and continue
      if (rush == sofar) {
         b4_sofar = b4_sofar->next;
         sofar = sofar->next;
         continue;
      }
      // de-link of sofar
      b4_sofar->next = sofar->next;
      // insert sofar between smaller and rush
      sofar->next = rush;
      // if sofar is the smallest, then put
      // it to head, else put it after smaller
      if (rush == head)
         head = sofar;
      else
         smaller->next = sofar;
      // go to next sofar
      sofar = b4_sofar->next;
   }/* sofar */

   // put the result back to array
   list_to_arry(a);
}
由於link list還尚未介紹,但如果你了解link list那便足夠了解這部分的程式邏輯。
最外層的迴圈:
  while (sofar != NULL)

就如同我們在普通insertion_sort一樣,sofar是從第2個node開始直到最後。而由於我們一開始程式的link list的tail node接的是NULL,因此我們便用這個方式判斷sofar是不是跑到結尾了。

而接著當我們把我們想要插入sofar的node的value拿出來assign給val_sofar,放到硬體的register上方便運算快速,在這之後便開始跑內迴圈:
 
      while (rush != sofar && rush->data <= val_sofar) {
         // if data  of rush is smaller than val_sofar,
         // move to the next element
         smaller = rush;
         rush = rush->next;
      }
這個內迴圈就是為了找出sofar應該要插入的位置,一樣是和普通的insertion_sort的內迴圈是一樣的對應,其中rush是往下一個node前進,而smaller則是rush前一個node,在此我們先要讓rush終止在sofar,防止他跑向不該跑向的位置。接著開始判斷大小已判定該有的位置,因此在這迴圈過後,會有2種情形。

1. rush等於sofar這個pointer跳出迴圈,意思是sofar比前面的都還要大。那麼我們不需要insert sofar, 直接往下一個node邁進,因此在這迴圈底下就會接了一個判斷式:

 
      if (rush == sofar) {
         b4_sofar = b4_sofar->next;
         sofar = sofar->next;
         continue;
      }
就是直接往下個node邁進。

2.找到適合的位置而跳出迴圈。這種情形我們該做的是:把sofar這個node拔起來:
 
     b4_sofar->next = sofar->next;
然後把sofar的尾端跟rush對接:

     sofar->next = rush;
接著把smaller的尾巴接到sofar上,但如果sofar是最小的點的話,那麼smaller會=rush也就是上面這個while的迴圈連跑都沒有跑,因此這時候sofar就會是這個link list的頭。最後我們朝向下一個sofar前進:
      if (rush == head)
         head = sofar;
      else
         smaller->next = sofar;
      // go to next sofar
      sofar = b4_sofar->next;
最後整個迴圈過後我們在運用早已寫好的 list_to_arry(a);把link list放回array。
====================================================================

但是我個人的猜測是,使用Link List的方式其實效率會比原先的方式差。而且沒錯的話應該會差蠻多的。先不跑測試的話,光用分析的方式,我們會發現在找出資料該插入適合的位置的時候,是採取一個node 一個node去trace。她不像是array在記憶體編排上是連續的形式,反而是分散的狀況,每個node和每個node之間其實是不連續的。

我們來看普通的link list 的node的資料結構是:
typedef struct data_node {
   int data;
   struct data_node *next;
}data_node;

我們來觀察,當我們從一個node到下一個node的時候,會做哪些事情:
1. 首先我們需要找出pointer內資料放的address。
2. 從這address中去記憶體提領next這個pointer內的資料,裡面包含了data以及一個pointer。

這樣子會造成什麼樣的問題呢?主要是cache miss的問題,由於每個node放在記憶體中的位置很可能不是連續的,因此cache update的時候從記憶體抓資料的時候很有可能都不會抓到next的node,那麼這樣子下來每次迴圈要跑下一個node的時候,cpu每次就得要等從memory拿資料到cache上,而不是像一般insertion的方法,幾乎資料都是一次抓一把到cache的slot中,使用array能夠有效利用cache update的優勢,即使要搬移資料其實也只需要在cache上搬移而已。因此單從計算的角度看起來,效率非常有差別。

當然普通的insertion有個問題就是資料量大的時候,數學機率考量的話,下一筆資料在每次迴圈比較的時候需要搬移的次數就會相對的大。更不用說要同時處理cpu管線處理的問題。但是由於現在的機器都已經使用預測表技術了,而加上資料會在cache上待命的狀況,其實問題就會小很多。
-----------------------------------------------------------------------------------
以下是使用了60000個elements的random array, range:[0:1000]的測試結果:
timing for insertion: 4 secs
timing for insertion using link list: 13 secs

以下是range:[0:60000]的測試結果:
timing for insertion: 4 secs
timing for insertion using link list: 13 secs

看起來似乎改變range差不多,當然我是用最陽春的timer,不過理論上差別不會太大。
因此預測應該是沒錯的 。雖然link list的方法大大減免了資料搬移的動作,但是畢竟memory bandwidth還是不可能跟得上cpu本身的運算時脈,cache miss的問題還是最能左右結果。因此作演算法的時間複雜度分析其實不能做很簡單的推測,主軸還是要先對硬體的特性有基本的了解才去分析才有真正的意義。


2013年7月12日 星期五

Insertion sort

// insertion sorting
void insertion_sort(int *a, int size)
{
   // the element that want to insert in
   unsigned sofar;
   // elements before sofar
   int prev;
   // saving the value of a[sofar]
   int a_sofar;

   // insertion for each elements
   for (sofar = 1; sofar < size; ++sofar) {
      // save the value of a[sofar]
      a_sofar = a[sofar];
      // previous index start from sofar - 1
      prev = sofar - 1;
      // while previous element is larger then
      // the value that we want to insert, then
      // push it to the right hand side
      while (prev >= 0 && a[prev] > a_sofar) {
         a[prev + 1] = a[prev];
         prev--;
      }
      // then a[prev] is smaller or equal than a_sofar
      // so we can insert a[sofar] to this position
      a[prev+1] = a_sofar;
   }/* sofar */
}

Insertion,插入排序法,是個資料量小時可以做的比較有效率的排序演算法。但是先撇開細部的討論,他的方式就是我們把它想像成身高不同的人排成一列,所以身高是參差不齊沒有排列過的。於是我先就把前面數來這第二個人把他從隊伍中叫出來叫他站在第一個人的旁邊,並且判定是否有沒有比第一個人矮,有的話就讓第一個往後退,第二個往前站在第一個人的位置。於是前面兩個已經暫時排序好了。

接下來我就叫第三個帥哥離開隊伍,叫他跟前面這第二個人做比較身高,如果這帥哥比較矮那麼這第二個人就往後退,接下來再次叫這帥哥跟第一個人比較一下身高,如果這帥哥又在次的比較矮那第一個人就往後退,帥哥這時就能夠順理成章的當第一個-目前最矮的傢伙。如果沒有那麼這個帥哥就排入原先第二個人的位置。(不要忘記這時候第二個人因為比這帥哥高,所以已經往後退退到帥哥原本的位置了,所以第二個位置就空起來了)

接下來的方式就是一模一樣,叫第四個跟前面那幾位來比較一下身高。第五個第六個亦然是如此。

如果你上面的說明有點看不懂或頭昏腦脹,我以下就每個句子就對應到程式碼來說明。
================================================================
首先我們把第二個人叫出來,於是你可以看到這個for loop是從這個隊伍的第二個人開始,然後在裡面持續的做身高的比較,直到最後的那個人也比較過:
   for (sofar = 1; sofar < size; ++sofar) {
   }

所以我們把這個帥哥從隊伍中叫出來:
     a_sofar = a[sofar];
把帥哥的身高記錄下來,這時候這個原本帥哥的位置就已經空掉了。之後他前面那個人如果比帥哥還高,那麼它就可以順理成章站在帥哥的這個位置,如果沒有,之後帥哥在乖乖的站回他原本的位置就行了。

接著我們開始把帥哥的身高和前面這幾位比較,首先我們比較他前面那一位:
      prev = sofar - 1;
並開始一個一個去比較,如果前面的一直比帥哥身高還高,就往後站,直到前面的某一位和帥哥身高一樣又或是比帥哥矮:
      while (prev >= 0 && a[prev] > a_sofar) {
         a[prev + 1] = a[prev];
         prev--;
      }
請注意這邊的比較迴圈有個先決條件就是前面的人不是幽靈(prev >= 0 )因為隊伍是從第0個開始排的,並沒有第-1個第-2個...所以我們設了這樣的條件,防止prev--會跑到幽靈人物那邊去。

最後這空的位置當然就給帥哥啦!
      a[prev+1] = a_sofar;
會跳出while比較迴圈並執行 a[prev+1] = a_sofar;只有三種情形:
 1. 一開始帥哥就比前面的還高,所以while不執行,直接執行 a[prev+1] = a_sofar;也就是帥哥被塞回去

2. 帥哥的身高中等,於是依他的身高前後都有人,那麼必然是因為while迴圈比較到一半發現他不符合a[prev] > a_sofar 因此執行了a[prev+1] = a_sofar; 所以prev這個人比帥哥矮或一樣,他沒有往後退,而prev+1這個位置原本的人因為比帥哥高,所以早就已經往後退了,因此這個prev+1的位置是空的,帥哥塞到這個位置是沒錯的。

3. 帥哥是最矮的,因此他是因為while迴圈發現了不符合prev >= 0(也就是prev = -1)了,所以該跳出迴圈去執行a[prev+1] = a_sofar; 那麼這時候a[prev+1] = a[0] 理所當然的帥哥應當順理成章的當目前最矮的傢伙。

bubble sort的研究和改進

在看這個文章之前,請回去看bubble sort中關於surface是什麼意思
========================================================================
關於bubble sort有個麻煩的地方就在於如果是一般任意的資料形式, 它swap的次數非常的多。
而且每次都會從第0個element開始去做sorting直到surface。
因此它唯一的好處只是為了找出最大的value,然後放到array的最右邊。

你可以宣稱說它在每次從0到surface的sorting的時候會有某部分的排序.比如有個array:
12 3 7 8 8 15 89
然後做一次sorting後:
3 7 8 8 12 15 89

然後宣稱12在那次sorting之後已經排好了,所以不用在swap了。
是的 這種情況有可能發生, 但在資料很多的狀況之下省掉的swap的量根本沒有很多。 因為每次都一樣會從第0個element開始做sorting,在直到surface之前都會swap好幾次。
第一次sorting的機率也許是一半 0.5 ,假設有1000個element,
所以第一次sorting的swap次數大約是500,而第二次也許因為第一次sorting已經swap後的影響很棒,所以又省掉了一些,假設是300,於是總共swap800次。接著第三次第四次...直到一千次之前絕對是超過幾千次swap。

這是我用1000個random number (mod 100) 的array去測量swap次數的結果:
swap次數 = 241946
如果這些random number是mod 1000:
swap次數 = 254648

看起來似乎跟data的range差異性不大.

因此這swap的次數太過驚人了,swap要傳入兩個arguements,做了兩次copy reference的運算,再加上三個assignment的運算,雖然乍看之下不少,但swap本身的次數是非常多的!等於是有24萬次的swap運算。 先不看copy reference的運算次數,光看那assignment的運算有三次的話,總共的次數會有72萬次基本assignment運算!

如果這array是10000個要做sorting呢?它swap的次數是?
答案是:
swap次數25115210
等於光看這樣的結果的話,資料每多一個order, swap的次數就多了兩個order,也就是swap的次數是資料量N的平方倍 O(N^2)
------------------------------------------------------------------------------------------------

個人在分析了這麼誇張的狀況之下,想了一下, 根本這個bubble sorting對於處理量不小的資料是有非常多餘沒有任何意義的運算。因為在找出最大的value之前,我們根本不用在那邊跟小嘍囉周旋!了解嗎?我們只是要直接找出最大的老大是誰,不需要一個一個小嘍囉在那瘋狂的swap。

因此以這樣的想法去想的話,我們在每次surface decrease之前其實只需要做一次swap就可以了,也就是找出最大的那個,然後跟surface那個position swap就好了。 因此這樣一來swap的次數就會等同於資料量的大小。

以下就是根據這想法自行編寫的程式碼:
void big_one_sort(int *a, int size)

{
   // mark the index of the largest value
   unsigned max_ind;
   // mark the largest value
   int max_val;

   unsigned i, surface;
   for (surface = size - 1; surface > 0; --surface) {
      // regard the last one as the max value
      max_ind = surface;
      max_val = a[max_ind];
      // find out the max value and index
      for (i = 0; i < surface; ++i)
         if (a[i] > max_val) {
            max_val = a[i];
            max_ind = i;
         }
      // swap the last one with max one
      swap(a + surface, a + max_ind);
   }/* surface */
}


基本上程式碼和bubble sort是差不多的,只是多了兩個量:
max_val 是為了記錄在某次sorting下的最大值,
max_ind就是這個最大值所對應的index(也就是在這array內的位置)
因此當內圈的i loop跑完之後,我們便會找到除了surface這個element之前的最大的value和其位置
接著我們就直接swap surface的value和max_val 也就是最大的那個值和surface交換

當然也許會問,為什麼i loop不要檢視到surface這個值? 理由在於:我們在i loop之前便已經把
max指定為surface那個值,也就是假定最後的那個是最大的。 接著我們在i loop之後就直接swap最後的那個以及最大的值, 因此會有兩個情形發生:
1. 找到的max_val比surface那個值還大, 那麼swap是沒什麼問題。
2. 找到的max_val比surface那個值還小, 那麼surface無疑就是最大的, 它和它自己swap沒什麼不好,當然也許多餘,但數學上是一樣的 (而且就數學機率上來說這個swap發生的機率= 1/surface 微乎其微)

而且最重要的是如果我們的for loop有檢視到surface,等於是我們至少會多一次運算,就是裡面的那個 if (a[i] > max_val)
這是無可避免的,但是如果我們的for loop並沒有到surface的話,我們少了這次運算,而且那個swap不管是哪種方案也照樣會去執行。
沒有任何理由去把這for loop跑到surface去。


也許你覺得這樣考量非常的龜毛沒必要,但細微的地方有時候常常會是速度上的差距。題外話。


因此最後比較這兩個速度的話...我用time.h的time函數去跑80000這樣大小的random array 速度上
big_one_sort會是bubble_sort的 33/9 = 366.67% 倍, 當然隨著資料量越多,這個差距會更加的恐怖。

2013年7月11日 星期四

bubble sort

// bubble sorting
void bubble_sort(int *a, int size)
{
   int i, surface;
   //surface for putting the largest value after sorting for i loop
   for (surface = size - 1; surface > 0; --surface)
      // sorting like bubble, compare then and
      // switch them if larger bubble is on the left
      // hand side(a[i]), and finally we will get
      // the largest value
      for (i = 0; i < surface; ++i)
         if (a[i] > a[i+1])
            swap(a + i, a + i + 1);
}

// swap two values
void swap(int *a, int *b)
{
   int tmp = *a;
   *a = *b;
   *b = tmp;
}
程式請直接放入這支程式的 int main()上方,接著在那支程式的"func"的位置下面放上
   bubble_sort(b, arry_size);
   printf("The result of bubble sort:\n");
   for (i = 0; i < arry_size; ++i)
      printf("%d ", b[i]);
   puts("");

Bubble sort的演算法的基本原理就像是水中的泡沫一般,比較大的泡沫會不斷的往上爬.我們從最下面的那一個泡沫開始, 如果他比上面的那一個大顆,那麼就讓它往上爬,如果遇到同樣大顆或是比它大顆的,那麼就請留在原地,接著我們就用同樣方式繼續檢視它上面那顆,直到表層. 在code裡面我把水的表層取名叫surface. 這個code其實非常的短, 我們先來從最重要的部份開始講起.
      for (i = 0; i < surface; ++i)
         if (a[i] > a[i+1])
            swap(a + i, a + i + 1);
這個部分是bubble sort的主要原理部分, swap就是直接交換這兩個放入指標的值. 這個迴圈就是從泡泡的最底層一個一個去檢視泡泡的大小a[i]以及a[i+1], 如果底下那顆a[i]比上面那顆a[i+1]大顆,那麼就交換它們的位置,可以把它想成大顆的泡泡往上擠,硬把上面那顆小顆的往下壓. 當然遇到比他塊頭一樣或更大的(a[i] <= a[i+1])就不擠了,當然檢視一樣沒有中斷,就繼續檢視上面這顆,讓他繼續往上擠,照著這樣的方式我們就會檢視到水面,也就是surface這邊了. 

因此照這樣的想法來看的話,我們在這次檢視過後,到最後在水面的必然就是這次檢視中最大顆的,要強調的就是"這次檢視". 因為我們會有好幾次檢視. 所以跑完第一次surface迴圈,我們找到最大顆的並且她因為不斷的往上擠的緣故所以跑到了表層,也就是這個array的最右邊. 接下來的迴圈
   for (surface = size - 1; surface > 0; --surface)
就是把水面往下壓, 因為這次檢視中最大顆的已經跑出水面了, 我們就沒必要再次的去比較它, 我們只需要比較水面底下剩下的那些小嘍囉, 因此就是把水面往下壓,直到剩下那顆泡泡(surface > 0) 至於為什麼不要(surface >= 0)的原因就是因為剩下的那顆理所當然的就是最小的,沒有必要再去比較了. 

因此這個程式到最後array最右邊的就會是最大顆的,接著它的左邊那個就會是第二次檢視中最大顆的,那麼問題來了, 這左邊那顆會比右邊那顆還大嗎? 答案是不會, 因為我們在第一次檢視中選取的就會是最大顆的,不要忘記,最先冒出水面的當然是最大顆的. 因此接著之後冒出來的都必然會相等或是更小,不會更大. 

至於swap這支程式就不多做敘述了, 不在資料結構這邊討論的範疇,這是C語言的pointer, 有興趣的人請自行google pointer以及function的關聯. 基本上這支swap程式就是交換傳入的兩個value而已.

排序前的準備

要做sorting之前,必須把資料儲存起來.
這邊使用了link list是為了能夠動態儲存使用者資料
原因在於我們不能預知使用者到底會輸入多少資料, 當然我們也可以使用c++的vector之類的東西, 只是在此並沒有使用.
而動態儲存至link list之後把它轉成array方便做sorting.
如果不曉得link list之後會介紹,在此只需要把程式碼直接copy過去即可.
其實程式真的不長,只是為了打上說明所以才變得這麼長
在此程式碼不多做說明,因為這只是事前的準備,個人在程式碼部分有打上簡單的說明,當然有些地方因個人英文不好,所以打出來的說明也許不是這麼清楚又或是文法有缺漏.
敬請多加包含.
ps:在此預設了輸入的value是integer.

#include <stdio.h>
#include <stdlib.h>

// structure for link list
typedef struct data_node {
   int data;                    // save data
   struct data_node *next;      // next node
} data_node;

// declare head and tail node pointer
data_node *head, *tail;

// save the data to the tail of the link list
void put_to_tail(int d)
{
   // create a node with data value d
   data_node *node;
   node = (data_node*) malloc(sizeof(data_node));
   node->data = d;

   node->next = NULL; // no follower

   // if this is the first node
   if (head == NULL)
      // then this one is head and also tail
      head = tail = node;
   else { // if there are existing nodes
      // then put that node to the last position
      tail->next = node;
      tail = node;
   }
}

// show each elements of the link list
// and return the number of the elements
unsigned show_list_and_size(void)
{
   if (head == tail)
      error_msg("please put in values");
   // number of the elements of the link list
   unsigned num = 0;
   printf("data list is the following:\n");

   // explorer of the list
   data_node *rush, *prev;
   // beginning position
   rush = head;
   // if it is not the end
   while (rush != NULL) {
      // then continue to explore
      // keep the previous position
      prev = rush;
      // explore the next node
      rush = rush->next;
      // print out the data
      printf("%d ", prev->data);
      // accumulate the number
      num++;
   }
   puts("");
   // return the number of the elements
   return num;
}

// put the data from the link list to array a
void list_to_arry(int *a)
{
   data_node *rush, *prev;

   // beginning position
   rush = head;
   // array index start from 0
   unsigned i = 0;

   // if it is not the end
   while (rush != NULL) {
      // then continue to explore
      // keep the previous position
      prev = rush;
      // put that data to the arrary
      a[i++] = prev->data;      
      // explore the next node
      rush = rush->next;
    }
}

// print out the error message
void error_msg(char *msg)
{
   fprintf(stderr, "%s\n", msg);
   exit(1);
}

int main(void)
{
   // initialize head and tail as NULL
   head = NULL;
   tail = NULL;

   printf("Enter the numbers:");
   int data;

   // if user is still input the data,
   // then put it to the tail of the link list
   while (scanf("%d", &data) != EOF)
      put_to_tail(data);

   // show all the elements and get the size
   unsigned arry_size = show_list_and_size();

   // allocate the arrary
   int *a = (int*) malloc(sizeof(int) * arry_size);
   // put the data from the list to that array
   list_to_arry(a);


   // allocate another array for showing the sorted result
   int *b = (int*) malloc(sizeof(int) * arry_size);
   int i;

   // initilize it as the original array
   for (i = 0; i < arry_size; ++i)
      b[i] = a[i];
   ///////////////////////////////
   //        func               //

   free(a);
   free(b);

   return 0;
}