標籤

顯示具有 array 標籤的文章。 顯示所有文章
顯示具有 array 標籤的文章。 顯示所有文章

2013年8月12日 星期一

陣列-sparse matrix的轉置

void trans_spm(spm *orig, spm *trans)
{
   // sofar is the index for trans
   int col, i, sofar = 1;
   // initialize matrix info
   int TotalCol = trans[0].row = orig[0].col;
   trans[0].col = orig[0].row;
   int TotalElements = trans[0].val = orig[0].val;

   // loop with column index
   for (col = 0; col < TotalCol; col++)
      // search all over the elements
      // for the same column
      for (i = 1; i <= TotalElements; ++i)
         if (orig[i].col == col) {
            // add the same column elements to trans
            trans[sofar].row = col;
            trans[sofar].col = orig[i].row;
            trans[sofar].val = orig[i].val;
            sofar++;
         }
}

sparse matrix的轉置矩陣,其做法的原理很簡單,首先的資料部分row 和column數目對調就是轉置的結果,總element數目不變。接著再以column為主,從原先的矩陣依照(column, row)的順序把每一個element抽出來放到轉置矩陣去。

因此首先有dummy index:
   int col, i, sofar = 1;
col是給column用的index,為了loop in column。而i則是跑全部的element。sofar則是記錄目前在轉置矩陣上的index。
接著把矩陣資料排好給轉置矩陣後,就開始依據(column, row)的順序把element從原矩陣中抽出。
  for (col = 0; col < TotalCol; col++)
因為我們要排的是轉置的矩陣,因此原先的矩陣排列方式是(row, column),那麼現在的轉置矩陣就會是原矩陣的相反(column, row)。首先的迴圈就run over all column的index去依據column大小的順序去排列轉置矩陣。
      for (i = 1; i <= TotalElements; ++i)
         if (orig[i].col == col) {
            // add the same column elements to trans
            trans[sofar].row = col;
            trans[sofar].col = orig[i].row;
            trans[sofar].val = orig[i].val;
            sofar++;
         }
我們便每次都從第一個element開始排列順序,我們每一次一個column都會從頭從原矩陣的每個element找起,找的目標就是當下的column,比如我們開始排列第0個column,那麼我們便從原矩陣找column為0的。而原矩陣的row和column早就已經排列過了,因此我們就按照element的順序去找起即可,而不需要考慮row的順序。當我們找到當下的column的時候,便把這個element assign給轉置矩陣,當然row和column是要對調的,而值是相同的。因此按照這樣的邏輯下去這個轉置矩陣變這樣完成。

2013年8月5日 星期一

陣列-sparse matrix的加法

// add sparse matrix c = a + b
void add_spm(spm *c, spm *a, spm *b)
{
   int ia = 1; // index for a
   int ib = 1; // index for b
   int ic = 1; // index for c

   // while both index are not yet reach the end
   while (ia <= a[0].val && ib <= b[0].val)
      // if a position > b position, then assign b to c
      if (a[ia].row > b[ib].row ||
          a[ia].row == b[ib].row && a[ia].col > b[ib].col)
         c[ic++] = b[ib++];
      // if a position < b position, then assign a to c
      else if (a[ia].row < b[ib].row ||
               a[ia].row == b[ib].row && a[ia].col < b[ib].col)
         c[ic++] = a[ia++];
      // if a position = b position, c = a+c
      else {
         // we only save non-zero elements
         int sum = a[ia].val + b[ib].val;
         if (sum != 0) {
            c[ic].row = a[ia].row;
            c[ic].col = a[ia].col;
            c[ic].val = sum;
            ic++;
         }
         ia++;
         ib++;
      }
   /* end while */

   // assign the rest elements a OR b to c
   while (ia <= a[0].val)
      c[ic++] = a[ia++];
   while (ib <= b[0].val)
      c[ic++] = b[ib++];
   // assgin the matrix info to c
   c[0].row = a[0].row;
   c[0].col = a[0].col;
   c[0].val = ic - 1;
}
加法其實很簡單,有overlap的elements就是直接加,沒有overlap的部分就是看row和column的位置,先判斷row在判斷column,row/column數字少的就直接遞給c。接著如果其中一方a或b已經把它自己的element都遞給c的時候,那麼另外一方就直接把剩餘的element都遞給c,因為已經不會有overlap的部分了。
  while (ia <= a[0].val && ib <= b[0].val)
首先這個迴圈,當a以及b皆有element要加的時候就會持續的做以下的加法。

      if (a[ia].row > b[ib].row ||
          a[ia].row == b[ib].row && a[ia].col > b[ib].col)
         c[ic++] = b[ib++];
在while迴圈仍有效的前提之下,開始判斷有沒有overlap的element。這邊就判定不是overlap,並且a的目前index的位置是在b之後的,所以我們就直接把b的element assign給c,並且各自的index往下一個element前進,而接著的else if則是相同的邏輯,只是是判定b的index在a之後。
      else {
         // we only save non-zero elements
         int sum = a[ia].val + b[ib].val;
         if (sum != 0) {
            c[ic].row = a[ia].row;
            c[ic].col = a[ia].col;
            c[ic].val = sum;
            ic++;
         }
接著最後則是a和b是overlap的element,把他們的值加上去之後assign給c,把c的index往後移之後
         ia++;
         ib++;
馬上把a和b的index都往後移,最後最外面的while迴圈跑完之後,有三種情形:1.(a沒有剩下了,b還有) 2.(b沒有剩下了,a還有) 3.(都沒有剩下了) 但不論是怎樣的情形,我們都會跑:
   while (ia <= a[0].val)
      c[ic++] = a[ia++];
   while (ib <= b[0].val)
      c[ic++] = b[ib++];
因為即使是情形1. a沒有剩下的element,我們跑while(ia <= a[0].val)它不會有element要執行,因此這邊是不會跑迴圈的,只會跑下面那個b,而情形2和情形3亦然是如此。最後我們再把這矩陣的資料assign給c就完成了。

2013年7月31日 星期三

陣列-sparse matrix的generation

// sparse matrix structure
typedef struct spm {
   int row; // row position
   int col; // column position
   int val; // value
}spm;
// generate a sparse matrix from a
void generate_spm(int *a, spm *s)
{
   // row, column index and counting
   // for nonzero elements
   int r, c, count = 0;
   // initialize first row
   s[0].row = M;
   s[0].col = N;

   for (r = 0; r < M; r++)
      for (c = 0; c < N; c++)
         // if there is an nonzero element
         if (a[r*N + c] != 0) {
            count++;
            s[count].row = r;
            s[count].col = c;
            s[count].val = a[r*N + c];
         }
   s[0].val = count;
}
int main()
{
   int i = 0;
   printf("Enter the matrix elements:\n");
   unsigned size = 0;
   int val;
   while (scanf("%d", &val) != EOF)
      put_to_tail(val);
   size = show_list_and_size();

   int *a = (int*)malloc(sizeof(int)*size);
   list_to_array(a);

   // input M
   FILE *file_in_M;
   file_in_M = fopen("M_size", "r");
   fscanf(file_in_M, "%d", &M);
   if (size % M != 0) {
      fprintf(stderr,"size % M != 0\n");
      exit(1);
   }
   fclose(file_in_M);
   N = size / M;


   int count_non_zero = 0;
   // counting nonzero elements of a
   for (i = 0; i < size; ++i)
      if (a[i] != 0)
         count_non_zero++;
   // allocating s with size of nonzero elements+1
   spm *s = (spm*) malloc(sizeof(spm)*(count_non_zero+1));

   // generate sparse matrix a to s
   generate_spm(a, s);
   //print out the result
   printf("   row  col  val\n---------------\n");
   for (i = 0; i <= count_non_zero; ++i)
      printf("%5i%5i%5i\n", s[i].row, s[i].col, s[i].val);
   return 0;
}   
首先這程式要能work,就先要把sorting的準備的這些function放進去。

這是陣列的一個應用。把一個sparse matrix(也就是matrix的elements有非常多都是0)轉成一個更精簡的matrix,使得:
1. 比較省記憶體空間
2. 處理矩陣運算速度能夠更快

有使用matlab的人如果有研究過的話,應該會曉得這軟體在處理sparse matrix是採用精簡matrix。你可以把它看成是一個對應表格。首先這種表格第一行就是這個矩陣的相關資料。如果這矩陣是M*N的大小,並且有100個非零項的話,那麼這個精簡表格的第一行會是如此的表示:
M   N   100。
接下來的每一行都會是根據非零項的相關位置來對應其值:
原矩陣:
0  3  0  0  1
0  2  1  0  3
7  0  9  0  1
表格矩陣:
3  5  8
0  1  3
0  4  1
1  1  2
1  2  1
1  4  3
2  0  7
2  2  9
2  4  1

看起來好像似乎資料量變多,但其實真正的sparse matrix的非零項會是更多的,假設有一個1000* 1000的matrix,非零項只有10,那麼資料型態若是int的話,原本的矩陣所需要的記憶體空間量將會是sizeof(int) * 1000*1000 但使用這種精簡矩陣的空間量將會大幅減少為sizeof(int)*(10+1) 減少了機乎是100,000倍的空間量。flop次數也更不用講了。

因此先從大方向的main開始講解其架構:

put_to_tail會從io那邊得來的資料放入link list中。接著show_list_and_size會show出link list的資料,來確認放入link list的資料是對的,並同時會回傳資料的筆數。而以此資料的筆數我們就能夠memory allocate資料的大小,並使用list_to_array把link list放入array a以內。

因為筆者的研究本身都是巨量資料,因此是需要memory allocate在記憶體中的,所以這種方式是用一維的陣列來儲存矩陣的資料。因此需要矩陣的input的size M*N。所以開了檔案,input進去M之後check看看這是不是合法的矩陣便能夠了解大小N。

而在generate_spm內我們把矩陣a把它弄成只收集非零項的資料,但即使如此我們還是必須要事先知道大小,因此我們需要一個變數count_non_zero去先了解矩陣有幾個非零項,然後才去mem. allocate我們要的精簡後的matrix s。

把sparse matrix a轉成s後就把s的elements印出來確認無誤。以上就是主要的操作流程。

所以我們需要自訂資料型態spm(sparse matrix的簡寫),有三個elements:
// sparse matrix structure
typedef struct spm {
   int row; // row position
   int col; // column position
   int val; // value
}spm;
這三個資料就會是我們精簡表格每一個row各自代表的資料,你可以把這矩陣想像成類似坐標那樣,我們需要一個點的資料就必須知道他的坐標,row就像是y坐標,而col就類似x坐標,而val就是這個點的值。這個資料型態將會儲存非0項的位置和其值。

接著generate_spm就將會把sparse matrx(一個1D的array)轉換成表格s。首先這個表格的第一行資料將會是這個sparse matrix的M,N,以及非零項的個數,於是我們就先輸入M和N:

   s[0].row = M;
   s[0].col = N;
接著當然我們也可以從外面的count_non_zero來輸入s[0].val。只是目前這邊沒有這樣子做。

然後r,c迴圈去做a的每一個elements的尋訪,找出非零項之後各自輸入到s上:
   for (r = 0; r < M; r++)
      for (c = 0; c < N; c++)
         // if there is an nonzero element
         if (a[r*N + c] != 0) {
            count++;
            s[count].row = r;
            s[count].col = c;
            s[count].val = a[r*N + c];
         }
最後輸入counting的大小,精簡矩陣s就這樣轉換完成了。