Example codes
and tutorials
The (Not-so) Quick Guide
List of tutorials/demo codes
Single-Physics Problems
Poisson
Adaptivity illustrated for Poisson
Advection-Diffusion
Unsteady heat equation
Linear wave equation
The Young-Laplace equation
Navier-Stokes
Free-surface Navier-Stokes
Axisymmetric Navier-Stokes
Solid mechanics
Beam structures
Shell structures
Multi-physics Problems
Fluid-structure interaction
Boussinesq convection
Steady thermoelasticity
Methods-based example codes and tutorials
Mesh generation
Linear solvers and preconditioners
Visualisation of the results
Parallel processing
How to write a new element
How to write a new refineable element
Default nonlinear solvers -- the sequence of action functions
...
Documentation
FE theory and top-down discussion of the data structure
The (Not-so) Quick Guide
Comprehensive bottom-up discussion of the data structure
List of available structured and unstructured meshes
Linear solvers and preconditioners
Visualisation of the results
Parallel processing
Coding conventions and C++ style
Creating documentation
Optimisation - robustness vs. "raw speed"
Linear vs. nonlinear problems
Storing shape functions
Changing the default "full" integration scheme
Disabling the ALE formulation of unsteady equations
C vs. C++ output
Different sparse assembly techniques and the STL memory pool
Publications
Publications
Talks
Journal publications
Theses
Picture show
Download
Copyright
Download/installation instructions
Download page
FAQ & Contact
FAQ
Change log
Bugs and other known problems
Completeness of the library & our "To-Do List"
Contact the developers
Get involved

 


Beta release!

Please note that the library has not been "officially" released. While we continue to work on the documentation, these web pages are likely to contain broken links and documents in draft form. Please send an email to

oomph-lib AT maths DOT man DOT ac DOT uk

if you wish to be informed of the library's "official" release.

matrices.cc

Go to the documentation of this file.
00001 //LIC// ====================================================================
00002 //LIC// This file forms part of oomph-lib, the object-oriented, 
00003 //LIC// multi-physics finite-element library, available 
00004 //LIC// at http://www.oomph-lib.org.
00005 //LIC// 
00006 //LIC//           Version 0.90. August 3, 2009.
00007 //LIC// 
00008 //LIC// Copyright (C) 2006-2009 Matthias Heil and Andrew Hazel
00009 //LIC// 
00010 //LIC// This library is free software; you can redistribute it and/or
00011 //LIC// modify it under the terms of the GNU Lesser General Public
00012 //LIC// License as published by the Free Software Foundation; either
00013 //LIC// version 2.1 of the License, or (at your option) any later version.
00014 //LIC// 
00015 //LIC// This library is distributed in the hope that it will be useful,
00016 //LIC// but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //LIC// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //LIC// Lesser General Public License for more details.
00019 //LIC// 
00020 //LIC// You should have received a copy of the GNU Lesser General Public
00021 //LIC// License along with this library; if not, write to the Free Software
00022 //LIC// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00023 //LIC// 02110-1301  USA.
00024 //LIC// 
00025 //LIC// The authors may be contacted at oomph-lib@maths.man.ac.uk.
00026 //LIC// 
00027 //LIC//====================================================================
00028 //Non-inline member functions for the matrix classes
00029 
00030 #include<set> 
00031 #include<map> 
00032 
00033 //oomph-lib headers
00034 #include "matrices.h"
00035 #include "linear_solver.h"
00036 
00037 #ifdef OOMPH_HAS_MPI
00038 #include "mpi.h"
00039 #endif
00040 
00041 namespace oomph
00042 {
00043 
00044 //=========================================================================
00045 /// Namespaces for (Numerical-Recipes-based) functions for
00046 /// eigensolver, based on Jacobi rotations.
00047 //=========================================================================
00048 namespace JacobiEigenSolver
00049 {
00050 
00051  /// Perform one Jacobi rotation on matrix a
00052  inline void rot(DenseDoubleMatrix&a, const double s, const double tau, 
00053                  const unsigned long i, const unsigned long j, 
00054                  const unsigned long k, const unsigned long l)
00055  {
00056   double g,h;
00057   
00058   g=a(i,j);
00059   h=a(k,l);
00060   a(i,j)=g-s*(h+g*tau);
00061   a(k,l)=h+s*(g-h*tau);
00062 
00063  }
00064 
00065 
00066 /// \short Use Jacobi rotations to determine eigenvalues and eigenvectors of 
00067 /// matrix a. d[i]=i-th eigenvalue; v(i,j)=i-th component of j-th eigenvector
00068 /// (note that this is the transpose of what we'd like to have...);
00069 /// nrot=number of rotations used. 
00070  void jacobi(DenseDoubleMatrix& a, Vector<double>& d, 
00071              DenseDoubleMatrix& v, unsigned long& nrot)
00072  {
00073 #ifdef PARANOID
00074   // Check Matrix a is square
00075   if (a.ncol()!=a.nrow())
00076    {
00077     throw OomphLibError(
00078      "This matrix is not square, the matrix MUST be square!",
00079      "JacobiEigenSolver::jacobi()",
00080      OOMPH_EXCEPTION_LOCATION);
00081    }
00082 #endif
00083  
00084   // If matrix v is wrong size, correct it!
00085   if (v.ncol()!=a.ncol() || v.nrow()!=a.nrow())
00086    {
00087     v.resize(a.nrow(),a.nrow(),0.0);
00088    }
00089 
00090   unsigned long i,j,ip,iq;
00091   double tresh,theta,tau,t,sm,s,h,g,c;
00092  
00093   unsigned long n=d.size();
00094   Vector<double> b(n);
00095   Vector<double> z(n);
00096   for (ip=0;ip<n;ip++) {
00097    for (iq=0;iq<n;iq++) v(ip,iq)=0.0;
00098    v(ip,ip)=1.0;
00099   }
00100   for (ip=0;ip<n;ip++) {
00101    b[ip]=d[ip]=a(ip,ip);
00102    z[ip]=0.0;
00103   }
00104   nrot=0;
00105   for (i=1;i<=50;i++) {
00106    sm=0.0;
00107    for (ip=0;ip<n-1;ip++) {
00108     for (iq=ip+1;iq<n;iq++)
00109      sm += std::abs(a(ip,iq));
00110    }
00111    if (sm == 0.0)
00112     return;
00113    if (i < 4)
00114     tresh=0.2*sm/(n*n);
00115    else
00116     tresh=0.0;
00117    for (ip=0;ip<n-1;ip++) {
00118     for (iq=ip+1;iq<n;iq++) {
00119      g=100.0*std::abs(a(ip,iq));
00120      if (i > 4 && (std::abs(d[ip])+g) == std::abs(d[ip])
00121          && (std::abs(d[iq])+g) == std::abs(d[iq]))
00122       a(ip,iq)=0.0;
00123      else if (std::abs(a(ip,iq)) > tresh) {
00124       h=d[iq]-d[ip];
00125       if ((std::abs(h)+g) == std::abs(h))
00126        t=(a(ip,iq))/h;
00127       else {
00128        theta=0.5*h/(a(ip,iq));
00129        t=1.0/(std::abs(theta)+std::sqrt(1.0+theta*theta));
00130        if (theta < 0.0) t = -t;
00131       }
00132       c=1.0/std::sqrt(1+t*t);
00133       s=t*c;
00134       tau=s/(1.0+c);
00135       h=t*a(ip,iq);
00136       z[ip] -= h;
00137       z[iq] += h;
00138       d[ip] -= h;
00139       d[iq] += h;
00140       a(ip,iq)=0.0;
00141       for (j=0;j<ip;j++)
00142        rot(a,s,tau,j,ip,j,iq);
00143       for (j=ip+1;j<iq;j++)
00144        rot(a,s,tau,ip,j,j,iq);
00145       for (j=iq+1;j<n;j++)
00146        rot(a,s,tau,ip,j,iq,j);
00147       for (j=0;j<n;j++)
00148        rot(v,s,tau,j,ip,j,iq);
00149       ++nrot;
00150      }
00151     }
00152    }
00153    for (ip=0;ip<n;ip++) {
00154     b[ip] += z[ip];
00155     d[ip]=b[ip];
00156     z[ip]=0.0;
00157    }
00158   }
00159   throw OomphLibError(
00160    "Too many iterations in routine jacobi",
00161    "JacobiEigenSolver::jacobi()",
00162    OOMPH_EXCEPTION_LOCATION);
00163  }
00164 
00165 }
00166 
00167 /////////////////////////////////////////////////////////////////////
00168 /////////////////////////////////////////////////////////////////////
00169 /////////////////////////////////////////////////////////////////////
00170 
00171 
00172 //============================================================================
00173 /// Complete LU solve (overwrites RHS with solution). This is the
00174 /// generic version which should not need to be over-written.
00175 //============================================================================
00176 void DoubleMatrixBase::solve(DoubleVector &rhs)
00177 {
00178 #ifdef PARANOID
00179  if(Linear_solver_pt==0)
00180   {
00181    throw OomphLibError("Linear_solver_pt not set in matrix",
00182                        "DoubleMatrixBase::solve()",
00183                        OOMPH_EXCEPTION_LOCATION);
00184   }
00185 #endif
00186 
00187  // Copy rhs vector into local storage so it doesn't get overwritten
00188  // if the linear solver decides to initialise the solution vector, say,
00189  // which it's quite entitled to do!
00190  DoubleVector actual_rhs(rhs);
00191 
00192  //Use the linear algebra interface to the linear solver
00193  Linear_solver_pt->solve(this,actual_rhs,rhs);
00194 }
00195 
00196 //============================================================================
00197 /// Complete LU solve (Nothing gets overwritten!). This generic
00198 /// version should never need to be overwritten
00199 //============================================================================
00200 void DoubleMatrixBase::solve(const DoubleVector &rhs, 
00201                              DoubleVector &soln)
00202 {
00203 #ifdef PARANOID
00204  if(Linear_solver_pt==0)
00205   {
00206    throw OomphLibError("Linear_solver_pt not set in matrix",
00207                        "DoubleMatrixBase::solve()",
00208                        OOMPH_EXCEPTION_LOCATION);
00209   }
00210 #endif
00211  //Use the linear algebra interface to the linear solver
00212  Linear_solver_pt->solve(this,rhs,soln);
00213 }
00214 
00215 //============================================================================
00216 /// Complete LU solve (overwrites RHS with solution). This is the
00217 /// generic version which should not need to be over-written.
00218 //============================================================================
00219 void DoubleMatrixBase::solve(Vector<double> &rhs)
00220 {
00221 #ifdef PARANOID
00222  if(Linear_solver_pt==0)
00223   {
00224    throw OomphLibError("Linear_solver_pt not set in matrix",
00225                        "DoubleMatrixBase::solve()",
00226                        OOMPH_EXCEPTION_LOCATION);
00227   }
00228 #endif
00229 
00230  // Copy rhs vector into local storage so it doesn't get overwritten
00231  // if the linear solver decides to initialise the solution vector, say,
00232  // which it's quite entitled to do!
00233  Vector<double> actual_rhs(rhs);
00234 
00235  //Use the linear algebra interface to the linear solver
00236  Linear_solver_pt->solve(this,actual_rhs,rhs);
00237 }
00238 
00239 //============================================================================
00240 /// Complete LU solve (Nothing gets overwritten!). This generic
00241 /// version should never need to be overwritten
00242 //============================================================================
00243 void DoubleMatrixBase::solve(const Vector<double> &rhs,
00244                              Vector<double> &soln)
00245 {
00246 #ifdef PARANOID
00247  if(Linear_solver_pt==0)
00248   {
00249    throw OomphLibError("Linear_solver_pt not set in matrix",
00250                        "DoubleMatrixBase::solve()",
00251                        OOMPH_EXCEPTION_LOCATION);
00252   }
00253 #endif
00254  //Use the linear algebra interface to the linear solver
00255  Linear_solver_pt->solve(this,rhs,soln);
00256 }
00257 
00258 
00259 ////////////////////////////////////////////////////////////////////////
00260 ////////////////////////////////////////////////////////////////////////
00261 ////////////////////////////////////////////////////////////////////////
00262 
00263 //===============================================================
00264 /// Constructor, set the default linear solver to be the DenseLU 
00265 /// solver
00266 //===============================================================
00267 DenseDoubleMatrix::DenseDoubleMatrix(): DenseMatrix<double>()
00268 {
00269  Linear_solver_pt = Default_linear_solver_pt = new DenseLU;
00270 }
00271 
00272 //==============================================================
00273 /// Constructor to build a square n by n matrix.
00274 /// Set the default linear solver to be DenseLU
00275 //==============================================================
00276 DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n) : 
00277  DenseMatrix<double>(n)
00278 {
00279  Linear_solver_pt = Default_linear_solver_pt = new DenseLU;
00280 }
00281  
00282 
00283 //=================================================================
00284 /// Constructor to build a matrix with n rows and m columns.
00285 /// Set the default linear solver to be DenseLU
00286 //=================================================================
00287  DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n, 
00288                                       const unsigned long &m) :
00289   DenseMatrix<double>(n,m)
00290 {
00291  Linear_solver_pt = Default_linear_solver_pt = new DenseLU;
00292 }
00293 
00294 //=====================================================================
00295 /// Constructor to build a matrix with n rows and m columns,
00296 /// with initial value initial_val
00297 /// Set the default linear solver to be DenseLU
00298 //=====================================================================
00299 DenseDoubleMatrix::DenseDoubleMatrix(const unsigned long &n, 
00300                                      const unsigned long &m,
00301                                      const double &initial_val) :
00302  DenseMatrix<double>(n,m,initial_val) 
00303 {
00304  Linear_solver_pt = Default_linear_solver_pt = new DenseLU;
00305 }
00306 
00307 //=======================================================================
00308 /// Destructor delete the default linear solver
00309 //======================================================================
00310 DenseDoubleMatrix::~DenseDoubleMatrix()
00311 {
00312  //Delete the default linear solver
00313  delete Default_linear_solver_pt;
00314 }
00315 
00316 //============================================================================
00317 /// LU decompose a matrix, by using the default linear solver
00318 /// (DenseLU)
00319 //============================================================================
00320 void DenseDoubleMatrix::ludecompose()
00321 {
00322  //Use the default (DenseLU) solver to ludecompose the matrix
00323  static_cast<DenseLU*>(Default_linear_solver_pt)->factorise(this);
00324 }
00325 
00326 
00327 //============================================================================
00328 ///  Back substitute an LU decomposed matrix.
00329 //============================================================================
00330 void DenseDoubleMatrix::lubksub(DoubleVector &rhs)
00331 {
00332  //Use the default (DenseLU) solver to perform the backsubstitution
00333  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs,rhs);
00334 }
00335 
00336 //============================================================================
00337 ///  Back substitute an LU decomposed matrix.
00338 //============================================================================
00339 void DenseDoubleMatrix::lubksub(Vector<double> &rhs)
00340 {
00341  //Use the default (DenseLU) solver to perform the backsubstitution
00342  static_cast<DenseLU*>(Default_linear_solver_pt)->backsub(rhs,rhs);
00343 }
00344 
00345 
00346 //============================================================================
00347 ///  Determine eigenvalues and eigenvectors, using
00348 /// Jacobi rotations. Only for symmetric matrices. Nothing gets overwritten!
00349 /// - \c eigen_vect(i,j) = j-th component of i-th eigenvector.
00350 /// - \c eigen_val[i] is the i-th eigenvalue; same ordering as in eigenvectors
00351 //============================================================================
00352 void DenseDoubleMatrix::eigenvalues_by_jacobi(Vector<double> & eigen_vals, 
00353                                               DenseMatrix<double> &eigen_vect)
00354  const
00355 {
00356 #ifdef PARANOID
00357  // Check Matrix is square
00358  if (N!=M)
00359   {
00360    throw OomphLibError(
00361     "This matrix is not square, the matrix MUST be square!",
00362     "DenseDoubleMatrix::eigenvalues_by_jacobi()",
00363     OOMPH_EXCEPTION_LOCATION);
00364   }
00365 #endif
00366  // Make a copy of the matrix & check that it's symmetric
00367 
00368  // Check that the sizes of eigen_vals and eigen_vect are correct. If not 
00369  // correct them.
00370  if (eigen_vals.size()!=N) 
00371   { 
00372    eigen_vals.resize(N); 
00373   }
00374  if (eigen_vect.ncol()!=N || eigen_vect.nrow()!=N) 
00375   { 
00376    eigen_vect.resize(N); 
00377   }
00378  
00379  DenseDoubleMatrix working_matrix(N);
00380  for (unsigned long i=0;i<N;i++)
00381   {
00382    for (unsigned long j=0;j<M;j++)
00383     {
00384 #ifdef PARANOID
00385      if (Matrixdata[M*i+j]!=Matrixdata[M*j+i])
00386       {
00387        throw OomphLibError(
00388         "Matrix needs to be symmetric for eigenvalues_by_jacobi()",
00389         "DenseDoubleMatrix::eigenvalues_by_jacobi()",
00390         OOMPH_EXCEPTION_LOCATION);
00391       }
00392 #endif
00393      working_matrix(i,j)=(*this)(i,j);
00394     }
00395   }
00396  
00397  DenseDoubleMatrix aux_eigen_vect(N);
00398  
00399  // Call Numerical recipies 
00400  unsigned long nrot;
00401  JacobiEigenSolver::jacobi(working_matrix, eigen_vals, aux_eigen_vect, 
00402                            nrot);
00403  // Copy across (and transpose)
00404  for (unsigned long i=0;i<N;i++)
00405   {
00406    for (unsigned long j=0;j<M;j++)
00407     {
00408      eigen_vect(i,j)=aux_eigen_vect(j,i);
00409     }
00410   }
00411 }
00412 
00413 
00414 //============================================================================
00415 ///  Multiply the matrix by the vector x: soln=Ax
00416 //============================================================================
00417 void DenseDoubleMatrix::multiply(const DoubleVector &x, DoubleVector &soln)
00418 {
00419 #ifdef PARANOID
00420  // Check to see if x.size() = ncol().
00421  if (x.nrow()!=this->ncol())
00422   {
00423    std::ostringstream error_message_stream;
00424    error_message_stream 
00425     << "The x vector is not the right size. It is " << x.nrow() 
00426     << ", it should be " << this->ncol() << std::endl;
00427    throw OomphLibError(error_message_stream.str(),
00428                        "DenseDoubleMatrix::multiply()",
00429                        OOMPH_EXCEPTION_LOCATION);
00430   }
00431  // check that x is not distributed
00432  if (x.distributed())
00433   {
00434    std::ostringstream error_message_stream;
00435    error_message_stream 
00436     << "The x vector cannot be distributed for DenseDoubleMatrix "
00437     << "matrix-vector multiply" << std::endl;
00438    throw OomphLibError(error_message_stream.str(),
00439                        "DenseDoubleMatrix::multiply()",
00440                        OOMPH_EXCEPTION_LOCATION);
00441   }
00442  // if soln is setup...
00443  if (soln.distribution_setup())
00444   {
00445    // check that soln is not distributed
00446    if (soln.distributed())
00447     {
00448      std::ostringstream error_message_stream;
00449      error_message_stream 
00450       << "The x vector cannot be distributed for DenseDoubleMatrix "
00451       << "matrix-vector multiply" << std::endl;
00452      throw OomphLibError(error_message_stream.str(),
00453                          "DenseDoubleMatrix::multiply()",
00454                          OOMPH_EXCEPTION_LOCATION);
00455     }
00456    if (soln.nrow() != this->nrow())
00457     {
00458      std::ostringstream error_message_stream;
00459      error_message_stream 
00460       << "The soln vector is setup and therefore must have the same "
00461       << "number of rows as the matrix";
00462      throw OomphLibError(error_message_stream.str(),
00463                          "DenseDoubleMatrix::multiply()",
00464                          OOMPH_EXCEPTION_LOCATION);
00465     }
00466    if (*x.distribution_pt()->communicator_pt() != 
00467        *soln.distribution_pt()->communicator_pt())
00468     {
00469      std::ostringstream error_message_stream;
00470      error_message_stream 
00471       << "The soln vector and the x vector must have the same communicator"
00472       << std::endl;
00473      throw OomphLibError(error_message_stream.str(),
00474                          "DenseDoubleMatrix::multiply()",
00475                          OOMPH_EXCEPTION_LOCATION);
00476     }
00477   }
00478 #endif
00479 
00480  // if soln is not setup then setup the distribution
00481  if (!soln.distribution_setup())
00482   {
00483    LinearAlgebraDistribution* dist_pt = 
00484     new LinearAlgebraDistribution(x.distribution_pt()->communicator_pt(),
00485                                   this->nrow(),false);
00486    soln.build(dist_pt,0.0);
00487    delete dist_pt;
00488   }
00489  soln.initialise(0.0);
00490 
00491  // Multiply the matrix A, by the vector x 
00492  double* x_pt = x.values_pt();
00493  double* soln_pt = soln.values_pt();
00494  for (unsigned long i=0;i<N;i++)
00495   {
00496    for (unsigned long j=0;j<M;j++)
00497     {
00498      soln_pt[i] += Matrixdata[M*i+j]*x_pt[j];
00499     }
00500   }
00501 }
00502 
00503 
00504 //=================================================================
00505 /// Multiply the transposed matrix by the vector x: soln=A^T x
00506 //=================================================================
00507 void DenseDoubleMatrix::multiply_transpose(const DoubleVector &x, 
00508                                            DoubleVector &soln)
00509 {
00510 #ifdef PARANOID
00511  // Check to see if x.size() = ncol().
00512  if (x.nrow()!=this->nrow())
00513   {
00514    std::ostringstream error_message_stream;
00515    error_message_stream 
00516     << "The x vector is not the right size. It is " << x.nrow() 
00517     << ", it should be " << this->nrow() << std::endl;
00518    throw OomphLibError(error_message_stream.str(),
00519                        "DenseDoubleMatrix::multiply()",
00520                        OOMPH_EXCEPTION_LOCATION);
00521   }
00522  // check that x is not distributed
00523  if (x.distributed())
00524   {
00525    std::ostringstream error_message_stream;
00526    error_message_stream 
00527     << "The x vector cannot be distributed for DenseDoubleMatrix "
00528     << "matrix-vector multiply" << std::endl;
00529    throw OomphLibError(error_message_stream.str(),
00530                        "DenseDoubleMatrix::multiply()",
00531                        OOMPH_EXCEPTION_LOCATION);
00532   }
00533  // if soln is setup...
00534  if (soln.distribution_setup())
00535   {
00536    // check that soln is not distributed
00537    if (soln.distributed())
00538     {
00539      std::ostringstream error_message_stream;
00540      error_message_stream 
00541       << "The x vector cannot be distributed for DenseDoubleMatrix "
00542       << "matrix-vector multiply" << std::endl;
00543      throw OomphLibError(error_message_stream.str(),
00544                          "DenseDoubleMatrix::multiply()",
00545                          OOMPH_EXCEPTION_LOCATION);
00546     }
00547    if (soln.nrow() != this->ncol())
00548     {
00549      std::ostringstream error_message_stream;
00550      error_message_stream 
00551       << "The soln vector is setup and therefore must have the same "
00552       << "number of columns as the matrix";
00553      throw OomphLibError(error_message_stream.str(),
00554                          "DenseDoubleMatrix::multiply()",
00555                          OOMPH_EXCEPTION_LOCATION);
00556     }
00557    if (*soln.distribution_pt()->communicator_pt() != 
00558        *x.distribution_pt()->communicator_pt())
00559     {
00560      std::ostringstream error_message_stream;
00561      error_message_stream 
00562       << "The soln vector and the x vector must have the same communicator"
00563       << std::endl;
00564      throw OomphLibError(error_message_stream.str(),
00565                          "DenseDoubleMatrix::multiply()",
00566                          OOMPH_EXCEPTION_LOCATION);
00567     }
00568   }
00569 #endif
00570 
00571  // if soln is not setup then setup the distribution
00572  if (!soln.distribution_setup())
00573   {
00574    LinearAlgebraDistribution* dist_pt = 
00575     new LinearAlgebraDistribution(x.distribution_pt()->communicator_pt(),
00576                                   this->ncol(),false);
00577    soln.build(dist_pt,0.0);
00578    delete dist_pt;
00579   }
00580 
00581  // Initialise the solution
00582  soln.initialise(0.0);
00583 
00584  // Matrix vector product
00585  double* soln_pt = soln.values_pt();
00586  double* x_pt = x.values_pt();
00587  for (unsigned long i=0;i<N;i++)
00588   {  
00589    for (unsigned long j=0;j<M;j++)
00590     {
00591      soln_pt[j] += Matrixdata[N*i+j]*x_pt[i];
00592     }
00593   }
00594 }
00595 
00596 
00597 
00598 //=================================================================
00599 /// For every row, find the maximum absolute value of the
00600 /// entries in this row. Set all values that are less than alpha times
00601 /// this maximum to zero and return the resulting matrix in
00602 /// reduced_matrix. Note: Diagonal entries are retained regardless
00603 /// of their size. 
00604 //=================================================================
00605 void DenseDoubleMatrix::matrix_reduction(const double &alpha,
00606                                        DenseDoubleMatrix &reduced_matrix)
00607 {
00608 
00609  reduced_matrix.resize(N,M,0.0);
00610  // maximum value in a row
00611  double max_row;
00612   
00613  // Loop over rows
00614  for(unsigned i=0;i<N;i++)
00615   { 
00616    // Initialise max value in row
00617    max_row=0.0;
00618    
00619    //Loop over entries in columns
00620    for(unsigned long j=0;j<M;j++)
00621     {
00622      // Find max. value in row
00623      if(std::abs( Matrixdata[M*i+j])>max_row)
00624       {
00625        max_row=std::abs( Matrixdata[M*i+j]);
00626       }
00627     }
00628 
00629    // Decide if we need to retain the entries in the row
00630    for(unsigned long j=0;j<M;j++)
00631     {
00632      // If we're on the diagonal or the value is sufficiently large: retain
00633      // i.e. copy across.
00634      if(i==j || std::abs(Matrixdata[M*i+j])>alpha*max_row )
00635       {
00636        reduced_matrix(i,j) =Matrixdata[M*i+j];
00637       }
00638     }
00639   }
00640  
00641 }
00642 
00643 
00644 //=============================================================================
00645 /// Function to multiply this matrix by the DenseDoubleMatrix  matrix_in.
00646 //=============================================================================
00647 void DenseDoubleMatrix::multiply(const DenseDoubleMatrix &matrix_in,
00648                                  DenseDoubleMatrix& result)
00649 {
00650 #ifdef PARANOID
00651  // check matrix dimensions are compatable 
00652  if ( this->ncol() != matrix_in.nrow()  )
00653   {
00654    std::ostringstream error_message;
00655    error_message
00656     << "Matrix dimensions incompatable for matrix-matrix multiplication"
00657     << "ncol() for first matrix:" << this->ncol()
00658     << "nrow() for second matrix: " << matrix_in.nrow();
00659    
00660    throw OomphLibError(error_message.str(),
00661                        "DenseDoubleMatrix::multiply()",
00662                        OOMPH_EXCEPTION_LOCATION);
00663   }
00664 #endif
00665  
00666  // NB N is number of rows!
00667  unsigned long n_row = this->nrow();
00668  unsigned long m_col = matrix_in.ncol();
00669  
00670  // resize and intialize result
00671  result.resize(n_row, m_col, 0.0);
00672  
00673  //clock_t clock1 = clock();
00674  
00675  // do calculation
00676  unsigned long n_col=this->ncol();
00677  for (unsigned long k=0; k<n_col; k++)
00678   {
00679    for (unsigned long i=0; i<n_row; i++)
00680     {
00681      for (unsigned long j=0; j<m_col; j++)
00682       {
00683        result(i,j) += Matrixdata[m_col*i+k] * matrix_in(k,j);
00684       }
00685     }
00686   }
00687 }
00688 
00689 
00690 
00691 ///////////////////////////////////////////////////////////////////////////////
00692 ///////////////////////////////////////////////////////////////////////////////
00693 ///////////////////////////////////////////////////////////////////////////////
00694 
00695 
00696 //=======================================================================
00697 /// \short Default constructor, set the default linear solver and 
00698 /// matrix-matrix multiplication method.
00699 //========================================================================
00700 CCDoubleMatrix::CCDoubleMatrix() : CCMatrix<double>()
00701   {
00702    Linear_solver_pt = Default_linear_solver_pt = new SuperLUSolver;
00703    Matrix_matrix_multiply_method = 2;
00704   }
00705   
00706 //========================================================================
00707  /// \short Constructor: Pass vector of values, vector of row indices,
00708  /// vector of column starts and number of rows (can be suppressed
00709  /// for square matrices). Number of nonzero entries is read
00710  /// off from value, so make sure the vector has been shrunk
00711  /// to its correct length.
00712 //=======================================================================
00713  CCDoubleMatrix::CCDoubleMatrix(const Vector<double>& value,
00714                                 const Vector<int>& row_index,
00715                                 const Vector<int>& column_start,
00716                                 const unsigned long &n,
00717                                 const unsigned long &m) :
00718   CCMatrix<double>(value,row_index,column_start,n,m)
00719   {
00720    Linear_solver_pt = Default_linear_solver_pt = new SuperLUSolver;
00721    Matrix_matrix_multiply_method = 2;
00722   }
00723 
00724  /// Destructor: delete the default linaer solver
00725  CCDoubleMatrix::~CCDoubleMatrix() {delete Default_linear_solver_pt;}
00726 
00727 
00728 //===================================================================
00729 /// Perform LU decomposition. Return the sign of the determinant
00730 //===================================================================
00731 void CCDoubleMatrix::ludecompose()
00732 {
00733  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
00734 }
00735 
00736 //===================================================================
00737 /// Do the backsubstitution
00738 //===================================================================
00739 void CCDoubleMatrix::lubksub(DoubleVector &rhs)
00740 {
00741  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->backsub(rhs,rhs);
00742 }
00743 
00744 //===================================================================
00745 ///  Multiply the matrix by the vector x
00746 //===================================================================
00747 void CCDoubleMatrix::multiply(const DoubleVector &x, DoubleVector &soln)
00748 {
00749 #ifdef PARANOID
00750  // Check to see if x.size() = ncol().
00751  if (x.nrow()!=this->ncol())
00752   {
00753    std::ostringstream error_message_stream;
00754    error_message_stream 
00755     << "The x vector is not the right size. It is " << x.nrow() 
00756     << ", it should be " << this->ncol() << std::endl;
00757    throw OomphLibError(error_message_stream.str(),
00758                        "CCDoubleMatrix::multiply()",
00759                        OOMPH_EXCEPTION_LOCATION);
00760   }
00761  // check that x is not distributed
00762  if (x.distributed())
00763   {
00764    std::ostringstream error_message_stream;
00765    error_message_stream 
00766     << "The x vector cannot be distributed for CCDoubleMatrix "
00767     << "matrix-vector multiply" << std::endl;
00768    throw OomphLibError(error_message_stream.str(),
00769                        "CCDoubleMatrix::multiply()",
00770                        OOMPH_EXCEPTION_LOCATION);
00771   }
00772  // if soln is setup...
00773  if (soln.distribution_setup())
00774   {
00775    // check that soln is not distributed
00776    if (soln.distributed())
00777     {
00778      std::ostringstream error_message_stream;
00779      error_message_stream 
00780       << "The x vector cannot be distributed for CCDoubleMatrix "
00781       << "matrix-vector multiply" << std::endl;
00782      throw OomphLibError(error_message_stream.str(),
00783                          "CCDoubleMatrix::multiply()",
00784                          OOMPH_EXCEPTION_LOCATION);
00785     }
00786    if (soln.nrow() != this->nrow())
00787     {
00788      std::ostringstream error_message_stream;
00789      error_message_stream 
00790       << "The soln vector is setup and therefore must have the same "
00791       << "number of rows as the matrix";
00792      throw OomphLibError(error_message_stream.str(),
00793                          "CCDoubleMatrix::multiply()",
00794                          OOMPH_EXCEPTION_LOCATION);
00795     }
00796    if (*soln.distribution_pt()->communicator_pt() != 
00797        *x.distribution_pt()->communicator_pt())
00798     {
00799      std::ostringstream error_message_stream;
00800      error_message_stream 
00801       << "The soln vector and the x vector must have the same communicator"
00802       << std::endl;
00803      throw OomphLibError(error_message_stream.str(),
00804                          "CCDoubleMatrix::multiply()",
00805                          OOMPH_EXCEPTION_LOCATION);
00806     }
00807   }
00808 #endif
00809 
00810  // if soln is not setup then setup the distribution
00811  if (!soln.distribution_setup())
00812   {
00813    LinearAlgebraDistribution* dist_pt = 
00814     new LinearAlgebraDistribution(x.distribution_pt()->communicator_pt(),
00815                                   this->nrow(),false);
00816    soln.build(dist_pt,0.0);
00817    delete dist_pt;
00818   }
00819 
00820  // zero
00821  soln.initialise(0.0);
00822  
00823  // multiply
00824  double* soln_pt = soln.values_pt();
00825  double* x_pt = x.values_pt();
00826  for (unsigned long j=0;j<N;j++)
00827   {
00828    for (long k=Column_start[j];k<Column_start[j+1];k++)
00829     {
00830      unsigned long i = Row_index[k];
00831      double a_ij = Value[k];
00832      soln_pt[i] += a_ij*x_pt[j];
00833     }
00834   }
00835 }
00836 
00837 
00838 
00839 
00840 //=================================================================
00841 /// Multiply the  transposed matrix by the vector x: soln=A^T x
00842 //=================================================================
00843 void CCDoubleMatrix::multiply_transpose(const DoubleVector &x, 
00844                                         DoubleVector &soln)
00845 {
00846 #ifdef PARANOID
00847  // Check to see if x.size() = ncol().
00848  if (x.nrow()!=this->nrow())
00849   {
00850    std::ostringstream error_message_stream;
00851    error_message_stream 
00852     << "The x vector is not the right size. It is " << x.nrow() 
00853     << ", it should be " << this->nrow() << std::endl;
00854    throw OomphLibError(error_message_stream.str(),
00855                        "CCDoubleMatrix::multiply()",
00856                        OOMPH_EXCEPTION_LOCATION);
00857   }
00858  // check that x is not distributed
00859  if (x.distributed())
00860   {
00861    std::ostringstream error_message_stream;
00862    error_message_stream 
00863     << "The x vector cannot be distributed for CCDoubleMatrix "
00864     << "matrix-vector multiply" << std::endl;
00865    throw OomphLibError(error_message_stream.str(),
00866                        "CCDoubleMatrix::multiply()",
00867                        OOMPH_EXCEPTION_LOCATION);
00868   }
00869  // if soln is setup...
00870  if (soln.distribution_setup())
00871   {
00872    // check that soln is not distributed
00873    if (soln.distributed())
00874     {
00875      std::ostringstream error_message_stream;
00876      error_message_stream 
00877       << "The x vector cannot be distributed for CCDoubleMatrix "
00878       << "matrix-vector multiply" << std::endl;
00879      throw OomphLibError(error_message_stream.str(),
00880                          "CCDoubleMatrix::multiply()",
00881                          OOMPH_EXCEPTION_LOCATION);
00882     }
00883    if (soln.nrow() != this->ncol())
00884     {
00885      std::ostringstream error_message_stream;
00886      error_message_stream 
00887       << "The soln vector is setup and therefore must have the same "
00888       << "number of columns as the matrix";
00889      throw OomphLibError(error_message_stream.str(),
00890                          "CCDoubleMatrix::multiply()",
00891                          OOMPH_EXCEPTION_LOCATION);
00892     }
00893    if (*soln.distribution_pt()->communicator_pt() != 
00894        *x.distribution_pt()->communicator_pt())
00895     {
00896      std::ostringstream error_message_stream;
00897      error_message_stream 
00898       << "The soln vector and the x vector must have the same communicator"
00899       << std::endl;
00900      throw OomphLibError(error_message_stream.str(),
00901                          "CCDoubleMatrix::multiply()",
00902                          OOMPH_EXCEPTION_LOCATION);
00903     }
00904   }
00905 #endif
00906 
00907  // if soln is not setup then setup the distribution
00908  if (!soln.distribution_setup())
00909   {
00910    LinearAlgebraDistribution* dist_pt = 
00911     new LinearAlgebraDistribution(x.distribution_pt()->communicator_pt(),
00912                                   this->ncol(),false);
00913    soln.build(dist_pt,0.0);
00914    delete dist_pt;
00915   }
00916 
00917  // zero
00918  soln.initialise(0.0);
00919  
00920  // Matrix vector product
00921  double* soln_pt = soln.values_pt();
00922  double* x_pt = x.values_pt();
00923  for (unsigned long i=0;i<N;i++)
00924   {  
00925    
00926    for (long k=Column_start[i];k<Column_start[i+1];k++)
00927     {
00928      unsigned long j=Row_index[k];
00929      double a_ij=Value[k];
00930      soln_pt[j]+=a_ij*x_pt[i];
00931     }
00932   }
00933 }
00934 
00935 
00936 
00937 
00938 //===========================================================================
00939 /// Function to multiply this matrix by the CCDoubleMatrix matrix_in
00940 /// The multiplication method used can be selected using the flag
00941 /// Matrix_matrix_multiply_method. By default Method 2 is used.
00942 /// Method 1: First runs through this matrix and matrix_in to find the storage
00943 ///           requirements for result - arrays of the correct size are 
00944 ///           then allocated before performing the calculation.
00945 ///           Minimises memory requirements but more costly.
00946 /// Method 2: Grows storage for values and column indices of result 'on the
00947 ///           fly' using an array of maps. Faster but more memory
00948 ///           intensive.
00949 /// Method 3: Grows storage for values and column indices of result 'on the
00950 ///           fly' using a vector of vectors. Not particularly impressive
00951 ///           on the platforms we tried...
00952 //=============================================================================
00953 void CCDoubleMatrix::multiply(const CCDoubleMatrix& matrix_in,
00954                               CCDoubleMatrix& result)
00955 {
00956 #ifdef PARANOID
00957  // check matrix dimensions are compatable
00958  if ( this->ncol() != matrix_in.nrow()  )
00959   {
00960    std::ostringstream error_message;
00961    error_message 
00962     << "Matrix dimensions incompatable for matrix-matrix multiplication"
00963     << "ncol() for first matrix:" << this->ncol()
00964     << "nrow() for second matrix: " << matrix_in.nrow();
00965    
00966    throw OomphLibError(error_message.str(),
00967                        "CCDoubleMatrix::multiply()",
00968                        OOMPH_EXCEPTION_LOCATION);
00969   }
00970 #endif
00971  
00972  // NB N is number of rows!
00973  unsigned long N = this->nrow();
00974  unsigned long M = matrix_in.ncol();
00975  unsigned long Nnz = 0;
00976  
00977  // pointers to arrays which store result
00978  int* Column_start;
00979  double* Value;
00980  int* Row_index;
00981 
00982  // get pointers to matrix_in
00983  const int* matrix_in_col_start = matrix_in.column_start();
00984  const int* matrix_in_row_index = matrix_in.row_index();
00985  const double* matrix_in_value = matrix_in.value();
00986 
00987  // get pointers to this matrix
00988  const double* this_value = this->value();
00989  const int* this_col_start = this->column_start();
00990  const int* this_row_index = this->row_index();
00991 
00992  // set method
00993  unsigned method = Matrix_matrix_multiply_method;
00994 
00995  // clock_t clock1 = clock();
00996 
00997  // METHOD 1
00998  // --------
00999  if (method==1)
01000  {
01001   // allocate storage for column starts
01002   Column_start = new int[M+1];
01003   Column_start[0]=0;
01004 
01005   // a set to store number of non-zero rows in each column of result
01006   std::set<unsigned> rows;
01007 
01008   // run through columns of this matrix and matrix_in to find number of
01009   // non-zero entries in each column of result
01010   for (unsigned long this_col = 0; this_col<M; this_col++)
01011   {
01012    // run through non-zeros in this_col of this matrix
01013    for (int this_ptr = this_col_start[this_col];
01014         this_ptr < this_col_start[this_col+1];
01015         this_ptr++)
01016    {
01017     // find row index for non-zero
01018     unsigned matrix_in_col = this_row_index[this_ptr];
01019 
01020     // run through corresponding column in matrix_in
01021     for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
01022          matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
01023          matrix_in_ptr++)
01024     {
01025      // find row index for non-zero in matrix_in and store in rows
01026      rows.insert(matrix_in_row_index[matrix_in_ptr]);
01027     }
01028    }
01029    // update Column_start
01030    Column_start[this_col+1] = Column_start[this_col] + rows.size();
01031 
01032    // wipe values in rows
01033    rows.clear();
01034   }
01035 
01036   // set Nnz
01037   Nnz = Column_start[M];
01038 
01039   // allocate arrays for result
01040   Value = new double[Nnz];
01041   Row_index = new int[Nnz];
01042 
01043   // set all values of Row_index to -1
01044   for (unsigned long i=0;i<Nnz;i++)
01045    Row_index[i] = -1;
01046 
01047   // Calculate values for result - first run through columns of this matrix
01048   for (unsigned long this_col = 0; this_col<M; this_col++)
01049    {
01050     // run through non-zeros in this_column
01051     for (int this_ptr = this_col_start[this_col];
01052          this_ptr < this_col_start[this_col+1];
01053          this_ptr++)
01054      {
01055       // find value of non-zero
01056       double this_val = this_value[this_ptr];
01057       
01058       // find row associated with non-zero
01059       unsigned matrix_in_col = this_row_index[this_ptr];
01060       
01061       // run through corresponding column in matrix_in
01062       for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
01063            matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
01064            matrix_in_ptr++)
01065        {
01066         // find row index for non-zero in matrix_in
01067         int row = matrix_in_row_index[matrix_in_ptr];
01068         
01069         // find position in result to insert value
01070         for(int ptr = Column_start[this_col];
01071             ptr <= Column_start[this_col+1];
01072             ptr++)
01073          {
01074           if (ptr == Column_start[this_col+1])
01075            {
01076             // error - have passed end of column without finding
01077             // correct row index
01078             std::ostringstream error_message;
01079             error_message << "Error inserting value in result";
01080             
01081             throw OomphLibError(error_message.str(),
01082                                 "CCDoubleMatrix::multiply()",
01083                                 OOMPH_EXCEPTION_LOCATION);
01084            }
01085           else if (Row_index[ptr] == -1 )
01086            {
01087             // first entry for this row index
01088             Row_index[ptr] = row;
01089             Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
01090             break;
01091            }
01092           else if ( Row_index[ptr] == row )
01093            {
01094             // row index already exists - add value
01095             Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
01096             break;
01097            }
01098          }
01099        }
01100      }
01101    }
01102  }
01103  
01104  // METHOD 2
01105  // --------
01106  else if (method==2)
01107   {
01108    // generate array of maps to store values for result
01109    std::map<int,double>* result_maps = new std::map<int,double>[M];
01110    
01111    // run through columns of this matrix
01112    for (unsigned long this_col = 0; this_col<M; this_col++)
01113     {
01114      // run through non-zeros in this_col
01115      for (int this_ptr = this_col_start[this_col];
01116           this_ptr < this_col_start[this_col+1];
01117           this_ptr++)
01118       {
01119        // find value of non-zero
01120        double this_val = this_value[this_ptr];
01121        
01122        // find row index associated with non-zero
01123        unsigned matrix_in_col = this_row_index[this_ptr];
01124        
01125        // run through corresponding column in matrix_in
01126        for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
01127             matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
01128             matrix_in_ptr++)
01129         {
01130          // find row index for non-zero in matrix_in
01131          int row = matrix_in_row_index[matrix_in_ptr];
01132          
01133          // insert value
01134          result_maps[this_col][row] += 
01135           this_val * matrix_in_value[matrix_in_ptr];
01136         }
01137       }
01138     }
01139    
01140    // allocate Column_start
01141    Column_start = new int[M+1];
01142    
01143    // copy across column starts
01144    Column_start[0] = 0;
01145    for (unsigned long col=0; col<M; col++)
01146     {
01147      int size = result_maps[col].size();
01148      Column_start[col+1] = Column_start[col] + size;
01149     }
01150    
01151    // set Nnz
01152    Nnz = Column_start[M];
01153    
01154    // allocate other arrays
01155    Value = new double[Nnz];
01156    Row_index = new int[Nnz];
01157    
01158    // copy values and row indices
01159    for (unsigned long col=0; col<M; col++)
01160     {
01161      unsigned ptr = Column_start[col];
01162      for (std::map<int,double>::iterator i = result_maps[col].begin();
01163           i != result_maps[col].end();
01164           i ++)
01165       {
01166        Row_index[ptr]= i->first;
01167        Value[ptr] = i->second;
01168        ptr++;
01169       }
01170     }
01171    
01172    // tidy up memory
01173    delete[] result_maps;
01174   }
01175  
01176  // METHOD 3
01177  // --------
01178  else if (method==3)
01179   {
01180    // vectors of vectors to store results
01181    std::vector< std::vector<int> > result_rows(N);
01182    std::vector< std::vector<double> > result_vals(N);
01183    
01184    // run through the columns of this matrix
01185   for (unsigned long this_col = 0; this_col<M; this_col++)
01186    {
01187     // run through non-zeros in this_col
01188     for (int this_ptr = this_col_start[this_col];
01189          this_ptr < this_col_start[this_col+1];
01190          this_ptr++)
01191      {
01192       // find value of non-zero
01193       double this_val = this_value[this_ptr];
01194       
01195       // find row index associated with non-zero
01196       unsigned matrix_in_col = this_row_index[this_ptr];
01197       
01198       // run through corresponding column in matrix_in
01199       for (int matrix_in_ptr = matrix_in_col_start[matrix_in_col];
01200            matrix_in_ptr < matrix_in_col_start[matrix_in_col+1];
01201            matrix_in_ptr++)
01202        {
01203         // find row index for non-zero in matrix_in
01204         int row = matrix_in_row_index[matrix_in_ptr];
01205         
01206         // insert value
01207         int size = result_rows[this_col].size();
01208         for (int i = 0; i<=size; i++)
01209          {
01210           if (i==size)
01211            {
01212             // first entry for this row index
01213             result_rows[this_col].push_back(row);
01214             result_vals[this_col].push_back(
01215              this_val*matrix_in_value[matrix_in_ptr]);
01216            }
01217           else if (row==result_rows[this_col][i])
01218            {
01219             // row index already exists
01220             result_vals[this_col][i] += 
01221              this_val * matrix_in_value[matrix_in_ptr];
01222             break;
01223            }
01224          }
01225        }
01226      }
01227    }
01228   
01229   // allocate Column_start
01230   Column_start = new int[M+1];
01231   
01232   // copy across column starts
01233   Column_start[0] = 0;
01234   for (unsigned long col=0; col<M; col++)
01235    {
01236     int size = result_rows[col].size();
01237     Column_start[col+1] = Column_start[col] + size;
01238    }
01239   
01240   // set Nnz
01241   Nnz = Column_start[M];
01242   
01243   // allocate other arrays
01244   Value = new double[Nnz];
01245   Row_index = new int[Nnz];
01246   
01247   // copy across values and row indices
01248   for (unsigned long col=0; col<N; col++)
01249    {
01250     unsigned ptr = Column_start[col];
01251     unsigned n_rows=result_rows[col].size();
01252     for (unsigned i = 0; i < n_rows ; i++)
01253      {
01254       Row_index[ptr] = result_rows[col][i];
01255       Value[ptr] = result_vals[col][i];
01256       ptr++;
01257      }
01258    }
01259   }
01260  
01261  // INCORRECT VALUE FOR METHOD
01262  else
01263   {
01264    std::ostringstream error_message;
01265    error_message << "Incorrect method set in matrix-matrix multiply"
01266                  << "method=" << method << " not allowed";
01267    
01268    throw OomphLibError(error_message.str(),
01269                        "CCDoubleMatrix::multiply()",
01270                        OOMPH_EXCEPTION_LOCATION);
01271   }
01272  
01273  result.build_without_copy(Value, Row_index, Column_start, Nnz, N, M);
01274 }
01275 
01276 
01277 
01278 //=================================================================
01279 /// For every row, find the maximum absolute value of the
01280 /// entries in this row. Set all values that are less than alpha times
01281 /// this maximum to zero and return the resulting matrix in
01282 /// reduced_matrix. Note: Diagonal entries are retained regardless
01283 /// of their size. 
01284 //=================================================================
01285 void CCDoubleMatrix::matrix_reduction(const double &alpha,
01286                                        CCDoubleMatrix &reduced_matrix)
01287 {
01288  // number of columns in matrix
01289  long n_coln=ncol();     
01290 
01291  Vector<double>max_row(nrow(),0.0);
01292 
01293  // Here's the packed format for the new matrix
01294  Vector<int> B_row_start(1);
01295  Vector<int> B_column_index;
01296  Vector<double> B_value;
01297  
01298 
01299  // k is counter for the number of entries in the reduced matrix
01300  unsigned k=0;
01301 
01302  // Initialise row start
01303  B_row_start[0]=0;
01304 
01305  // Loop over columns
01306  for(long i=0;i<n_coln;i++)
01307   { 
01308      
01309    //Loop over entries in columns
01310    for(long j=Column_start[i];j<Column_start[i+1];j++)
01311     {
01312     
01313      // Find max. value in row
01314      if(std::abs(Value[j])>max_row[Row_index[j]])
01315       {
01316        max_row[Row_index[j]]=std::abs(Value[j]);
01317       }
01318     }
01319 
01320    // Decide if we need to retain the entries in the row
01321    for(long j=Column_start[i];j<Column_start[i+1];j++)
01322     {
01323      // If we're on the diagonal or the value is sufficiently large: retain
01324      // i.e. copy across.
01325      if(i==Row_index[j] || std::abs(Value[j])>alpha*max_row[Row_index[j]] )
01326       {
01327        B_value.push_back(Value[j]);
01328        B_column_index.push_back(Row_index[j]);
01329        k++;
01330       }
01331     }
01332    // This writes the row start for the next row -- equal to 
01333    // to the number of entries written so far (C++ zero-based indexing!)
01334    B_row_start.push_back(k);
01335   }
01336 
01337 
01338  // Build the matrix from the compressed format
01339  dynamic_cast<CCDoubleMatrix&>(reduced_matrix).
01340   build(B_value,B_column_index,B_row_start,nrow(),ncol());
01341  }
01342 
01343 
01344 
01345 ///////////////////////////////////////////////////////////////////////////////
01346 ///////////////////////////////////////////////////////////////////////////////
01347 ///////////////////////////////////////////////////////////////////////////////
01348 
01349 //=============================================================================
01350 /// Default constructor
01351 //=============================================================================
01352 CRDoubleMatrix::CRDoubleMatrix()
01353   {
01354    // set the default solver
01355    Linear_solver_pt = Default_linear_solver_pt = new SuperLUSolver;
01356 
01357    // matrix not built
01358    Built = false;
01359 
01360     // set the serial matrix-matrix multiply method
01361 #ifdef HAVE_TRILINOS
01362     Serial_matrix_matrix_multiply_method = 2;
01363 #else
01364     Serial_matrix_matrix_multiply_method = 2;
01365 #endif
01366   }
01367 
01368 //=============================================================================
01369 /// Constructor: just stores the distribution but does not build the
01370 /// matrix
01371 //=============================================================================
01372 CRDoubleMatrix::CRDoubleMatrix(const LinearAlgebraDistribution* 
01373                                distribution_pt)
01374   {
01375    Distribution_pt->rebuild(distribution_pt);
01376 
01377    // set the default solver
01378    Linear_solver_pt = Default_linear_solver_pt = new SuperLUSolver;
01379 
01380    // matrix not built
01381    Built = false;
01382 
01383 // set the serial matrix-matrix multiply method
01384 #ifdef HAVE_TRILINOS
01385     Serial_matrix_matrix_multiply_method = 2;
01386 #else
01387     Serial_matrix_matrix_multiply_method = 2;
01388 #endif
01389 
01390   }
01391 
01392 //=============================================================================
01393 /// \short Constructor: Takes the distribution and the number of columns, as 
01394 /// well as the vector of values, vector of column indices,vector of row 
01395 ///starts.
01396 //=============================================================================
01397 CRDoubleMatrix::CRDoubleMatrix(const LinearAlgebraDistribution* dist_pt,
01398                                const unsigned& ncol,
01399                                const Vector<double>& value, 
01400                                const Vector<int>& column_index,
01401                                const Vector<int>& row_start) 
01402 {
01403  // build the compressed row matrix
01404  CR_matrix.build(value,column_index,row_start,dist_pt->nrow_local(),ncol);
01405 
01406  // store the Distribution
01407  Distribution_pt->rebuild(dist_pt);
01408 
01409  // set the linear solver
01410  Linear_solver_pt = Default_linear_solver_pt = new SuperLUSolver;
01411 
01412  // set the serial matrix-matrix multiply method
01413 #ifdef HAVE_TRILINOS
01414  Serial_matrix_matrix_multiply_method = 2;
01415 #else
01416  Serial_matrix_matrix_multiply_method = 2;
01417 #endif
01418 
01419  // matrix has been built
01420  Built = true;
01421 }
01422 
01423 //=============================================================================
01424 /// Destructor
01425 //=============================================================================
01426 CRDoubleMatrix::~CRDoubleMatrix()
01427 {
01428  this->clear();
01429  delete Default_linear_solver_pt;
01430  Default_linear_solver_pt = 0;
01431 }
01432  
01433 //=============================================================================
01434 /// rebuild the matrix - assembles an empty matrix with a defined distribution
01435 //=============================================================================
01436 void CRDoubleMatrix::build(const LinearAlgebraDistribution* distribution_pt)
01437 {
01438  this->clear();
01439  Distribution_pt->rebuild(distribution_pt);
01440 }
01441 
01442 //=============================================================================
01443 /// clean method
01444 //=============================================================================
01445 void CRDoubleMatrix::clear() 
01446 {
01447  Distribution_pt->clear();
01448  CR_matrix.clean_up_memory();
01449  Built = false;
01450  Linear_solver_pt->clean_up_memory();
01451 }
01452 
01453 //=============================================================================
01454 /// \short build method: Takes the distribution and the number of columns, as 
01455 /// well as the vector of values, vector of column indices,vector of row 
01456 ///starts.
01457 //=============================================================================
01458 void CRDoubleMatrix::build(const LinearAlgebraDistribution* distribution_pt,
01459                            const unsigned& ncol,
01460                            const Vector<double>& value, 
01461                            const Vector<int>& column_index,
01462                            const Vector<int>& row_start)
01463 {
01464  // clear
01465  this->clear();
01466 
01467  // store the Distribution
01468  Distribution_pt->rebuild(distribution_pt);
01469 
01470  // set the linear solver
01471  Default_linear_solver_pt = new SuperLUSolver;   
01472 
01473  // now build the matrix
01474  this->build_matrix(ncol,value,column_index,row_start);
01475 }
01476 
01477 //=============================================================================
01478 /// \short method to rebuild the matrix, but not the distribution
01479 //=============================================================================
01480 void CRDoubleMatrix::build_matrix(const unsigned& ncol,
01481                                   const Vector<double>& value,
01482                                   const Vector<int>& column_index,
01483                                   const Vector<int>& row_start)
01484 {
01485  // call the underlying build method
01486  CR_matrix.clean_up_memory();
01487  CR_matrix.build(value,column_index,row_start,
01488                  Distribution_pt->nrow_local(),ncol);
01489 
01490  // matrix has been build
01491  Built = true;
01492 }
01493 
01494 //=============================================================================
01495 /// \short method to rebuild the matrix, but not the distribution
01496 //=============================================================================
01497 void CRDoubleMatrix::build_matrix_without_copy(const unsigned& ncol,
01498                                                const unsigned& nnz,
01499                                                double* value,
01500                                                int* column_index,
01501                                                int* row_start)
01502 {
01503  // call the underlying build method
01504  CR_matrix.clean_up_memory();
01505  CR_matrix.build_without_copy(value,column_index,row_start,nnz,
01506                               Distribution_pt->nrow_local(),ncol);
01507 
01508  // matrix has been build
01509  Built = true;
01510 }
01511 
01512 //=============================================================================
01513 /// Do LU decomposition
01514 //=============================================================================
01515 void CRDoubleMatrix::ludecompose()
01516 {
01517 #ifdef PARANOID
01518  // check that the this matrix is built
01519  if (!Built)
01520   {
01521    std::ostringstream error_message_stream;
01522    error_message_stream 
01523     << "This matrix has not been built.";
01524    throw OomphLibError(error_message_stream.str(),
01525                        "CRDoubleMatrix::ludecompose()",
01526                        OOMPH_EXCEPTION_LOCATION);
01527   }
01528 #endif
01529 
01530  // factorise using superlu or superlu dist if we oomph has mpi
01531  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->factorise(this);
01532 
01533 }
01534 
01535 //=============================================================================
01536 /// Do back-substitution
01537 //=============================================================================
01538 void CRDoubleMatrix::lubksub(DoubleVector &rhs)
01539 {
01540 #ifdef PARANOID
01541  // check that the rhs vector is setup
01542  if (!rhs.distribution_setup())
01543   {
01544    std::ostringstream error_message_stream;
01545    error_message_stream 
01546     << "The vector rhs has not been setup";
01547    throw OomphLibError(error_message_stream.str(),
01548                        "CRDoubleMatrix::lubksub()",
01549                        OOMPH_EXCEPTION_LOCATION);
01550   }
01551  // check that the rhs vector has the same distribution as this matrix
01552  if (!(*Distribution_pt == *rhs.distribution_pt()))
01553   {
01554    std::ostringstream error_message_stream;
01555    error_message_stream 
01556     << "The vector rhs must have the same distribution as the matrix";
01557    throw OomphLibError(error_message_stream.str(),
01558                        "CRDoubleMatrix::lubksup()",
01559                        OOMPH_EXCEPTION_LOCATION);
01560   }
01561 #endif
01562 
01563  // backsub
01564  DoubleVector rhs_copy(rhs);
01565  static_cast<SuperLUSolver*>(Default_linear_solver_pt)->backsub(rhs_copy,rhs);
01566 }
01567 
01568 //=============================================================================
01569 ///  Multiply the matrix by the vector x
01570 //=============================================================================
01571 void CRDoubleMatrix::multiply(const DoubleVector &x, DoubleVector &soln)
01572 {
01573 #ifdef PARANOID
01574  // check that this matrix is built
01575  if (!Built)
01576   {
01577    std::ostringstream error_message_stream;
01578    error_message_stream 
01579     << "This matrix has not been built";
01580    throw OomphLibError(error_message_stream.str(),
01581                        "CRDoubleMatrix::multiply()",
01582                        OOMPH_EXCEPTION_LOCATION);
01583   }
01584  // check that the distribution of x is setup
01585  if (!x.distribution_setup())
01586   {
01587  std::ostringstream error_message_stream;
01588    error_message_stream 
01589     << "The distribution of the vector x must be setup";
01590    throw OomphLibError(error_message_stream.str(),
01591                        "CRDoubleMatrix::multiply()",
01592                        OOMPH_EXCEPTION_LOCATION);
01593   }
01594  // Check to see if x.size() = ncol().
01595  if (this->ncol() != x.distribution_pt()->nrow())
01596   {
01597    std::ostringstream error_message_stream;
01598    error_message_stream 
01599     << "The number of rows in the x vector and the number of columns in the "
01600     << "matrix must be the same";
01601    throw OomphLibError(error_message_stream.str(),
01602                        "CRDoubleMatrix::multiply()",
01603                        OOMPH_EXCEPTION_LOCATION);
01604   }
01605  // if the soln is distributed
01606  if (soln.distribution_setup())
01607   {
01608    if (!(*soln.distribution_pt() == Distribution_pt))
01609     {
01610      std::ostringstream error_message_stream;
01611      error_message_stream 
01612       << "The soln vector is setup and therefore must have the same "
01613       << "distribution as the matrix";
01614      throw OomphLibError(error_message_stream.str(),
01615                          "CRDoubleMatrix::multiply()",
01616                          OOMPH_EXCEPTION_LOCATION);
01617     }
01618   }
01619 #endif
01620 
01621  // if soln is not setup then setup the distribution
01622  if (!soln.distribution_setup())
01623   {
01624    // Resize and initialize the solution vector
01625    soln.build(this->distribution_pt(),0.0);
01626   }
01627 
01628  // if distributed and on more than one processor use trilinos
01629  // otherwise use the oomph-lib methods
01630  if (this->distributed() && 
01631      this->distribution_pt()->communicator_pt()->nproc() > 1)
01632   {
01633 #ifdef HAVE_TRILINOS
01634  // This will only work if we have trilinos on board
01635  TrilinosHelpers::multiply(*this,x,soln);
01636 #else
01637    std::ostringstream error_message_stream;
01638    error_message_stream 
01639     << "Matrix-vector product on multiple processors with distributed "
01640     << "CRDoubleMatrix requires Trilinos.";
01641    throw OomphLibError(error_message_stream.str(),
01642                        "CRDoubleMatrix::multiply()",
01643                        OOMPH_EXCEPTION_LOCATION);
01644 #endif
01645   }
01646  else
01647   {   
01648    unsigned n = this->nrow();
01649    const int* row_start = CR_matrix.row_start();
01650    const int* column_index = CR_matrix.column_index();
01651    const double* value = CR_matrix.value();
01652    double* soln_pt = soln.values_pt();
01653    double* x_pt = x.values_pt();
01654    for (unsigned long i=0;i<n;i++)
01655     {  
01656      soln_pt[i] = 0.0;
01657      for (long k=row_start[i];k<row_start[i+1];k++)
01658       {
01659        unsigned long j=column_index[k];
01660        double a_ij=value[k];
01661        soln_pt[i]+=a_ij*x_pt[j];
01662       }
01663     }
01664   }
01665 }
01666 
01667 
01668 
01669 //=================================================================
01670 /// Multiply the  transposed matrix by the vector x: soln=A^T x
01671 //=================================================================
01672 void CRDoubleMatrix::multiply_transpose(const DoubleVector &x, 
01673                                         DoubleVector &soln)
01674 {
01675 #ifdef PARANOID
01676  // check that this matrix is built
01677  if (!Built)
01678   {
01679    std::ostringstream error_message_stream;
01680    error_message_stream 
01681     << "This matrix has not been built";
01682    throw OomphLibError(error_message_stream.str(),
01683                        "CRDoubleMatrix::multiply_transpose()",
01684                        OOMPH_EXCEPTION_LOCATION);
01685   }
01686  // Check to see if x.size() = ncol().
01687  if (!(*Distribution_pt == *x.distribution_pt()))
01688   {
01689    std::ostringstream error_message_stream;
01690    error_message_stream 
01691     << "The x vector and this matrix must have the same distribution.";
01692    throw OomphLibError(error_message_stream.str(),
01693                        "CRDoubleMatrix::multiply_transpose()",
01694                        OOMPH_EXCEPTION_LOCATION);
01695   }
01696  // if soln is setup then it should have the same distribution as x
01697  if (soln.distribution_setup())
01698   {
01699    if (soln.distribution_pt()->nrow() != this->ncol())
01700     {
01701      std::ostringstream error_message_stream;
01702      error_message_stream 
01703       << "The soln vector is setup and therefore must have the same "
01704       << "number of rows as the vector x";
01705      throw OomphLibError(error_message_stream.str(),
01706                          "CRDoubleMatrix::multiply_transpose()",
01707                          OOMPH_EXCEPTION_LOCATION);
01708     }
01709   }
01710 #endif
01711 
01712  // if soln is not setup then setup the distribution
01713  if (!soln.distribution_setup())
01714   {
01715    LinearAlgebraDistribution* dist_pt = 
01716     new LinearAlgebraDistribution(x.distribution_pt()->communicator_pt(),
01717                                   this->ncol(),this->distributed());
01718    soln.build(dist_pt,0.0);
01719    delete dist_pt;
01720   }
01721 
01722  if (this->distributed() && 
01723      this->distribution_pt()->communicator_pt()->nproc() > 1)
01724   {
01725 #ifdef HAVE_TRILINOS
01726    // This will only work if we have trilinos on board
01727    TrilinosHelpers::multiply(*this,x,soln);
01728 #else
01729      std::ostringstream error_message_stream;
01730      error_message_stream 
01731       << "Matrix-vector product on multiple processors with distributed "
01732       << "CRDoubleMatrix requires Trilinos.";
01733      throw OomphLibError(error_message_stream.str(),
01734                          "CRDoubleMatrix::multiply_transpose()",
01735                          OOMPH_EXCEPTION_LOCATION);
01736 #endif
01737   }
01738  else
01739   {
01740    unsigned n = this->nrow();
01741    const int* row_start = CR_matrix.row_start();
01742    const int* column_index = CR_matrix.column_index();
01743    const double* value = CR_matrix.value();
01744    double* soln_pt = soln.values_pt();
01745    double* x_pt = x.values_pt();
01746    // Matrix vector product
01747    for (unsigned long i=0;i<n;i++)
01748     {  
01749      for (long k=row_start[i];k<row_start[i+1];k++)
01750       {
01751        unsigned long j=column_index[k];
01752        double a_ij=value[k];
01753        soln_pt[j]+=a_ij*x_pt[i];
01754       }
01755     }
01756   }
01757 }
01758 
01759 
01760 //===========================================================================
01761 /// Function to multiply this matrix by the CRDoubleMatrix matrix_in.\n
01762 /// In a serial matrix, there are 4 methods available: \n
01763 /// Method 1: First runs through this matrix and matrix_in to find the storage
01764 ///           requirements for result - arrays of the correct size are 
01765 ///           then allocated before performing the calculation.
01766 ///           Minimises memory requirements but more costly. \n
01767 /// Method 2: Grows storage for values and column indices of result 'on the
01768 ///           fly' using an array of maps. Faster but more memory
01769 ///           intensive. \n
01770 /// Method 3: Grows storage for values and column indices of result 'on the
01771 ///           fly' using a vector of vectors. Not particularly impressive
01772 ///           on the platforms we tried... \n
01773 /// Method 4: Trilinos Epetra Matrix Matrix multiply.\n
01774 /// Method 5: Trilinox Epetra Matrix Matrix Mulitply (ml based) \n
01775 /// If Trilinos is installed then Method 4 is employed by default, otherwise
01776 /// Method 2 is employed by default. \n
01777 /// In a distributed matrix, only Trilinos Epetra Matrix Matrix multiply
01778 /// is available.
01779 //=============================================================================
01780 void CRDoubleMatrix::multiply(CRDoubleMatrix& matrix_in,
01781                               CRDoubleMatrix& result)
01782 {
01783 #ifdef PARANOID
01784  // check that this matrix is built
01785  if (!Built)
01786   {
01787    std::ostringstream error_message_stream;
01788    error_message_stream 
01789     << "This matrix has not been built";
01790    throw OomphLibError(error_message_stream.str(),
01791                        "CRDoubleMatrix::multiply()",
01792                        OOMPH_EXCEPTION_LOCATION);
01793   }
01794  // check that this matrix is built
01795  if (!matrix_in.built())
01796   {
01797    std::ostringstream error_message_stream;
01798    error_message_stream 
01799     << "This matrix matrix_in has not been built";
01800    throw OomphLibError(error_message_stream.str(),
01801                        "CRDoubleMatrix::multiply()",
01802                        OOMPH_EXCEPTION_LOCATION);
01803   }
01804  // if soln is setup then it should have the same distribution as x
01805  if (result.distribution_setup())
01806   {
01807    if (!(*result.distribution_pt() == *Distribution_pt))
01808     {
01809      std::ostringstream error_message_stream;
01810      error_message_stream 
01811       << "The matrix result is setup and therefore must have the same "
01812       << "distribution as the vector x";
01813      throw OomphLibError(error_message_stream.str(),
01814                          "CRDoubleMatrix::multiply()",
01815                          OOMPH_EXCEPTION_LOCATION);
01816     }
01817   }
01818 #endif
01819 
01820  // if the result has not been setup, then store the distribution
01821  if (!result.distribution_setup())
01822   {
01823    result.build(Distribution_pt);
01824   }
01825    
01826  // short name for Serial_matrix_matrix_multiply_method
01827  unsigned method = Serial_matrix_matrix_multiply_method;
01828 
01829  // if this matrix is not distributed and matrix in is not distributed
01830  if (!this->distributed() && !matrix_in.distributed() && 
01831      ((method == 1) || (method == 2) || (method == 3)))
01832   {
01833    // NB N is number of rows!
01834    unsigned long N = this->nrow();
01835    unsigned long M = matrix_in.ncol();
01836    unsigned long Nnz = 0;
01837    
01838    // pointers to arrays which store result
01839    int* Row_start = 0;
01840    double* Value = 0;
01841    int* Column_index = 0;
01842    
01843    // get pointers to matrix_in
01844    const int* matrix_in_row_start = matrix_in.row_start();
01845    const int* matrix_in_column_index = matrix_in.column_index();
01846    const double* matrix_in_value = matrix_in.value();
01847    
01848    // get pointers to this matrix
01849    const double* this_value = this->value();
01850    const int* this_row_start = this->row_start();
01851    const int* this_column_index = this->column_index();
01852    
01853    //clock_t clock1 = clock();
01854    
01855    // METHOD 1
01856    // --------
01857    if (method==1)
01858     {
01859      // allocate storage for row starts
01860      Row_start = new int[N+1];
01861      Row_start[0]=0;
01862      
01863      // a set to store number of non-zero columns in each row of result
01864      std::set<unsigned> columns;
01865      
01866      // run through rows of this matrix and matrix_in to find number of
01867      // non-zero entries in each row of result
01868      for (unsigned long this_row = 0; this_row<N; this_row++)
01869       {
01870        // run through non-zeros in this_row of this matrix
01871        for (int this_ptr = this_row_start[this_row];
01872             this_ptr < this_row_start[this_row+1];
01873             this_ptr++)
01874         {
01875          // find column index for non-zero
01876          int matrix_in_row = this_column_index[this_ptr];
01877          
01878          // run through corresponding row in matrix_in
01879          for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
01880               matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
01881               matrix_in_ptr++)
01882           {
01883            // find column index for non-zero in matrix_in and store in columns
01884            columns.insert(matrix_in_column_index[matrix_in_ptr]);
01885           }
01886         }
01887        // update Row_start
01888        Row_start[this_row+1] = Row_start[this_row] + columns.size();
01889        
01890        // wipe values in columns
01891        columns.clear();
01892       }
01893      
01894      // set Nnz
01895      Nnz = Row_start[N];
01896      
01897      // allocate arrays for result
01898      Value = new double[Nnz];
01899      Column_index = new int[Nnz];
01900      
01901      // set all values of Column_index to -1
01902      for (unsigned long i=0; i<Nnz; i++)
01903       {
01904        Column_index[i] = -1;
01905       }
01906      
01907      // Calculate values for result - first run through rows of this matrix
01908      for (unsigned long this_row = 0; this_row<N; this_row++)
01909       {
01910        // run through non-zeros in this_row
01911        for (int this_ptr = this_row_start[this_row];
01912             this_ptr < this_row_start[this_row+1];
01913             this_ptr++)
01914         {
01915          // find value of non-zero
01916          double this_val = this_value[this_ptr];
01917          
01918          // find column associated with non-zero
01919          int matrix_in_row = this_column_index[this_ptr];
01920          
01921          // run through corresponding row in matrix_in
01922          for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
01923               matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
01924               matrix_in_ptr++)
01925           {
01926            // find column index for non-zero in matrix_in
01927            int col = matrix_in_column_index[matrix_in_ptr];
01928            
01929            // find position in result to insert value
01930            for(int ptr = Row_start[this_row];
01931                ptr <= Row_start[this_row+1];
01932                ptr++)
01933             {
01934              if (ptr == Row_start[this_row+1])
01935               {
01936                // error - have passed end of row without finding
01937                // correct column
01938                std::ostringstream error_message;
01939                error_message << "Error inserting value in result";
01940                
01941                throw OomphLibError(error_message.str(),
01942                            "CRDoubleMatrix::multiply()",
01943                                    OOMPH_EXCEPTION_LOCATION);
01944               }
01945              else if (  Column_index[ptr] == -1 )
01946               {
01947                // first entry for this column index
01948                Column_index[ptr] = col;
01949                Value[ptr] = this_val * matrix_in_value[matrix_in_ptr];
01950                break;
01951               }
01952              else if ( Column_index[ptr] == col )
01953               {
01954                // column index already exists - add value
01955                Value[ptr] += this_val * matrix_in_value[matrix_in_ptr];
01956                break;
01957               }
01958             }
01959           }
01960         }
01961       }
01962     }
01963    
01964    // METHOD 2
01965    // --------
01966    else if (method==2)
01967     {
01968      // generate array of maps to store values for result
01969      std::map<int,double>* result_maps = new std::map<int,double>[N];
01970      
01971      // run through rows of this matrix
01972      for (unsigned long this_row = 0; this_row<N; this_row++)
01973       {
01974        // run through non-zeros in this_row
01975        for (int this_ptr = this_row_start[this_row];
01976             this_ptr < this_row_start[this_row+1];
01977             this_ptr++)
01978         {
01979          // find value of non-zero
01980          double this_val = this_value[this_ptr];
01981          
01982          // find column index associated with non-zero
01983          int matrix_in_row = this_column_index[this_ptr];
01984          
01985          // run through corresponding row in matrix_in
01986          for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
01987               matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
01988               matrix_in_ptr++)
01989           {
01990            // find column index for non-zero in matrix_in
01991            int col = matrix_in_column_index[matrix_in_ptr];
01992            
01993            // insert value
01994            result_maps[this_row][col] += this_val * matrix_in_value[matrix_in_ptr];
01995           }
01996         }
01997       }
01998      
01999      // allocate Row_start
02000      Row_start = new int[N+1];
02001      
02002      // copy across row starts
02003      Row_start[0] = 0;
02004      for (unsigned long row=0; row<N; row++)
02005       {
02006        int size = result_maps[row].size();
02007        Row_start[row+1] = Row_start[row] + size;
02008       }
02009      
02010      // set Nnz
02011      Nnz = Row_start[N];
02012      
02013      // allocate other arrays
02014      Value = new double[Nnz];
02015      Column_index = new int[Nnz];
02016      
02017      // copy values and column indices
02018      for (unsigned long row=0; row<N; row++)
02019       {
02020        unsigned ptr = Row_start[row];
02021        for (std::map<int,double>::iterator i = result_maps[row].begin();
02022             i != result_maps[row].end();
02023             i ++)
02024         {
02025          Column_index[ptr]= i->first;
02026          Value[ptr] = i->second;
02027          ptr++;
02028         }
02029       }
02030      
02031      // tidy up memory
02032      delete[] result_maps;
02033     }
02034  
02035    // METHOD 3
02036    // --------
02037    else if (method==3)
02038     {
02039      // vectors of vectors to store results
02040      std::vector< std::vector<int> > result_cols(N);
02041      std::vector< std::vector<double> > result_vals(N);
02042      
02043      // run through the rows of this matrix
02044      for (unsigned long this_row = 0; this_row<N; this_row++)
02045       {
02046        // run through non-zeros in this_row
02047        for (int this_ptr = this_row_start[this_row];
02048             this_ptr < this_row_start[this_row+1];
02049             this_ptr++)
02050         {
02051          // find value of non-zero
02052          double this_val = this_value[this_ptr];
02053          
02054          // find column index associated with non-zero
02055          int matrix_in_row = this_column_index[this_ptr];
02056          
02057          // run through corresponding row in matrix_in
02058          for (int matrix_in_ptr = matrix_in_row_start[matrix_in_row];
02059             matrix_in_ptr < matrix_in_row_start[matrix_in_row+1];
02060               matrix_in_ptr++)
02061           {
02062            // find column index for non-zero in matrix_in
02063            int col = matrix_in_column_index[matrix_in_ptr];
02064            
02065            // insert value
02066            int size = result_cols[this_row].size();
02067            for (int i = 0; i<=size; i++)
02068             {
02069              if (i==size)
02070               {
02071                // first entry for this column
02072                result_cols[this_row].push_back(col);
02073                result_vals[this_row].push_back(
02074                 this_val*matrix_in_value[matrix_in_ptr]);
02075               }
02076              else if (col==result_cols[this_row][i])
02077               {
02078                // column already exists
02079                result_vals[this_row][i] += this_val * 
02080                 matrix_in_value[matrix_in_ptr];
02081                break;
02082               }
02083             }
02084           }
02085         }
02086       }
02087      
02088      // allocate Row_start
02089      Row_start = new int[N+1];
02090      
02091      // copy across row starts
02092      Row_start[0] = 0;
02093      for (unsigned long row=0; row<N; row++)
02094       {
02095        int size = result_cols[row].size();
02096        Row_start[row+1] = Row_start[row] + size;
02097       }
02098      
02099      // set Nnz
02100      Nnz = Row_start[N];
02101      
02102      // allocate other arrays
02103      Value = new double[Nnz];
02104      Column_index = new int[Nnz];
02105      
02106      // copy across values and column indices
02107      for (unsigned long row=0; row<N; row++)
02108       {
02109        unsigned ptr = Row_start[row];
02110        unsigned nnn=result_cols[row].size();
02111        for (unsigned i = 0; i < nnn; i++) 
02112         {
02113          Column_index[ptr] = result_cols[row][i];
02114          Value[ptr] = result_vals[row][i];
02115          ptr++;
02116         }
02117       }
02118     }
02119    
02120    // build
02121    result.build_matrix_without_copy(M, Nnz, Value, Column_index, Row_start);
02122   }
02123   
02124  // else we have to use trilinos
02125  else
02126   {
02127 #ifdef HAVE_TRILINOS
02128      bool use_ml = false;
02129      if (method == 5)
02130       {
02131        use_ml = true;
02132       }
02133      TrilinosHelpers::multiply(*this,matrix_in,result,use_ml);
02134 #else
02135      std::ostringstream error_message;
02136      error_message << "Serial_matrix_matrix_multiply_method = "
02137                    << Serial_matrix_matrix_multiply_method 
02138                    << " requires trilinos.";
02139      throw OomphLibError(error_message.str(),
02140                          "CRDoubleMatrix::multiply()",
02141                          OOMPH_EXCEPTION_LOCATION);
02142 #endif
02143   }
02144 }
02145   
02146  
02147 
02148 //=================================================================
02149 /// For every row, find the maximum absolute value of the
02150 /// entries in this row. Set all values that are less than alpha times
02151 /// this maximum to zero and return the resulting matrix in
02152 /// reduced_matrix. Note: Diagonal entries are retained regardless
02153 /// of their size. 
02154 //=================================================================
02155 void CRDoubleMatrix::matrix_reduction(const double &alpha,
02156                                        CRDoubleMatrix &reduced_matrix)
02157 {
02158  // number of rows in matrix
02159  long n_row=nrow_local();     
02160  double max_row;
02161  
02162  // Here's the packed format for the new matrix
02163  Vector<int> B_row_start(1);
02164  Vector<int> B_column_index;
02165  Vector<double> B_value;
02166  
02167  // get pointers to the underlying data
02168  const int* row_start = CR_matrix.row_start();
02169  const int* column_index = CR_matrix.column_index();
02170  const double* value = CR_matrix.value();
02171  
02172  // k is counter for the number of entries in the reduced matrix
02173  unsigned k=0;
02174 
02175  // Initialise row start
02176  B_row_start[0]=0;
02177 
02178  // Loop over rows
02179  for(long i=0;i<n_row;i++)
02180   { 
02181    // Initialise max value in row
02182    max_row=0.0;
02183    
02184    //Loop over entries in columns
02185    for(long j=row_start[i];j<row_start[i+1];j++)
02186     {
02187      // Find max. value in row
02188      if(std::abs(value[j])>max_row)
02189       {
02190        max_row=std::abs(value[j]);
02191       }
02192     }
02193 
02194    // Decide if we need to retain the entries in the row
02195    for(long j=row_start[i];j<row_start[i+1];j++)
02196     {
02197      // If we're on the diagonal or the value is sufficiently large: retain
02198      // i.e. copy across.
02199      if(i==column_index[j] || std::abs(value[j])>alpha*max_row )
02200       {
02201        B_value.push_back(value[j]);
02202        B_column_index.push_back(column_index[j]);
02203        k++;
02204       }
02205     }
02206    // This writes the row start for the next row -- equal to 
02207    // to the number of entries written so far (C++ zero-based indexing!)
02208    B_row_start.push_back(k);
02209   }
02210  
02211  // Build the matrix from the compressed format
02212  dynamic_cast<CRDoubleMatrix&>(reduced_matrix).
02213   build_matrix(this->ncol(),B_value,B_column_index,B_row_start);
02214  }
02215 
02216 //=============================================================================
02217 /// if this matrix is distributed then a the equivalent global matrix is built
02218 /// using new and returned. The calling method is responsible for the 
02219 /// destruction of the new matrix.
02220 //=============================================================================
02221 CRDoubleMatrix* CRDoubleMatrix::return_global_matrix()
02222 {
02223 #ifdef OOMPH_HAS_MPI
02224  // if this matrix is not distributed then this method is redundant
02225  if (!this->distributed() || 
02226      this->distribution_pt()->communicator_pt()->nproc() == 1)
02227   {
02228    return new CRDoubleMatrix(*this);
02229   }
02230 
02231  // nnz 
02232  int nnz = this->nnz();
02233 
02234  // my nrow local
02235  unsigned nrow_local = this->nrow_local();
02236 
02237  // nrow global
02238  unsigned nrow = this->nrow();
02239    
02240  // cache nproc
02241  int nproc = Distribution_pt->communicator_pt()->nproc();
02242 
02243  // get the nnzs on the other processors
02244  int* dist_nnz_pt = new int[nproc];
02245  MPI_Allgather(&nnz,1,MPI_INT,
02246                dist_nnz_pt,1,MPI_INT,
02247                Distribution_pt->communicator_pt()->mpi_comm());
02248    
02249  // create a int vector of first rows and nrow local and compute nnz global
02250  int* dist_first_row = new int[nproc];
02251  int* dist_nrow_local =  new int[nproc];
02252  int nnz_global = 0;
02253  for (int p = 0; p < nproc; p++)
02254   {
02255    nnz_global += dist_nnz_pt[p];
02256    dist_first_row[p] = this->first_row(p);
02257    dist_nrow_local[p] = this->nrow_local(p);
02258   }
02259 
02260  // conpute the offset for the values and column index data
02261  int* nnz_offset = new int[nproc];
02262  nnz_offset[0] = 0;
02263  for (int p = 1; p < nproc; p++)
02264   {
02265    nnz_offset[p] = nnz_offset[p-1] + dist_nnz_pt[p-1];
02266   }
02267    
02268  // get pointers to the (current) distributed data
02269  int* dist_row_start = this->row_start();
02270  int* dist_column_index = this->column_index();
02271  double* dist_value = this->value();
02272    
02273  // space for the global matrix
02274  int* global_row_start = new int[nrow+1];
02275  int* global_column_index = new int[nnz_global];
02276  double* global_value = new double[nnz_global];
02277 
02278  // get the row starts
02279  MPI_Allgatherv(dist_row_start,nrow_local,MPI_INT,
02280                 global_row_start,dist_nrow_local,dist_first_row,MPI_INT,
02281                 Distribution_pt->communicator_pt()->mpi_comm());
02282    
02283  // get the column indexes
02284  MPI_Allgatherv(dist_column_index,nnz,MPI_INT,
02285                 global_column_index,dist_nnz_pt,nnz_offset,MPI_INT,
02286                 Distribution_pt->communicator_pt()->mpi_comm());
02287  
02288  // get the values
02289  MPI_Allgatherv(dist_value,nnz,MPI_DOUBLE,
02290                 global_value,dist_nnz_pt,nnz_offset,MPI_DOUBLE,
02291                 Distribution_pt->communicator_pt()->mpi_comm());
02292    
02293  // finally the last row start
02294  global_row_start[nrow] = nnz_global;
02295    
02296  // update the other row start
02297  for (int p = 0; p < nproc; p++)
02298   {
02299    for (int i = 0; i < dist_nrow_local[p]; i++)
02300     {
02301      unsigned j = dist_first_row[p] + i;
02302      global_row_start[j]+=nnz_offset[p];
02303     }
02304   }
02305 
02306  // create the global distribution
02307  LinearAlgebraDistribution* dist_pt = new 
02308   LinearAlgebraDistribution(Distribution_pt->communicator_pt(),nrow,false);
02309  
02310  // create the matrix
02311  CRDoubleMatrix* matrix_pt = new CRDoubleMatrix(dist_pt);
02312  
02313  // copy of distribution taken so delete
02314  delete dist_pt;
02315 
02316  // pass data into matrix
02317  matrix_pt->build_matrix_without_copy(this->ncol(),nnz_global,global_value,
02318                                         global_column_index,global_row_start);
02319 
02320  // clean up
02321  delete dist_first_row;
02322  delete dist_nrow_local;
02323  delete nnz_offset;
02324  delete dist_nnz_pt;
02325  
02326  // and return
02327  return matrix_pt;
02328 #else
02329  return new CRDoubleMatrix(*this);
02330 #endif
02331 }
02332 
02333  //============================================================================
02334  /// The contents of the matrix are redistributed to match the new
02335  /// distribution. In a non-MPI build this method does nothing. \n
02336  /// \b NOTE 1: The current distribution and the new distribution must have
02337  /// the same number of global rows.\n
02338  /// \b NOTE 2: The current distribution and the new distribution must have
02339  /// the same Communicator.
02340  //============================================================================
02341  void CRDoubleMatrix::redistribute(const LinearAlgebraDistribution* 
02342                                  const& dist_pt)
02343  {
02344 #ifdef OOMPH_HAS_MPI
02345 #ifdef PARANOID
02346    // paranoid check that the nrows for both distributions is the 
02347    // same
02348    if (dist_pt->nrow() != Distribution_pt->nrow())
02349     {
02350      std::ostringstream error_message;    
02351      error_message << "The number of global rows in the new distribution ("
02352                    << dist_pt->nrow() << ") is not equal to the number"
02353                    << " of global rows in the current distribution ("
02354                    << Distribution_pt->nrow() << ").\n"; 
02355      throw OomphLibError(error_message.str(),
02356                          "CRDoubleMatrix::redistribute(...)",
02357                          OOMPH_EXCEPTION_LOCATION);
02358     }
02359    // paranoid check that the current distribution and the new distribution
02360    // have the same Communicator
02361    OomphCommunicator temp_comm(*dist_pt->communicator_pt());
02362    if (!(temp_comm == *Distribution_pt->communicator_pt()))
02363     {
02364      std::ostringstream error_message;  
02365      error_message << "The new distribution and the current distribution must "
02366                    << "have the same communicator.";
02367      throw OomphLibError(error_message.str(),
02368                          "CRDoubleMatrix::redistribute(...)",
02369                          OOMPH_EXCEPTION_LOCATION);
02370     }
02371    // paranoid check that the matrix is build
02372    if (!this->built())
02373     {
02374      std::ostringstream error_message;  
02375      error_message << "The matrix must be build to be redistributed";
02376      throw OomphLibError(error_message.str(),
02377                          "CRDoubleMatrix::redistribute(...)",
02378                          OOMPH_EXCEPTION_LOCATION);     
02379     }
02380 #endif
02381 
02382    // if the two distributions are not the same
02383    // =========================================
02384    if (!((*Distribution_pt) == *dist_pt))
02385     {
02386      
02387      // current data
02388      int* current_row_start = this->row_start();
02389      int* current_column_index = this->column_index();
02390      double* current_value = this->value();
02391 
02392      // get the rank and the number of processors
02393      int my_rank = Distribution_pt->communicator_pt()->my_rank();
02394      int nproc = Distribution_pt->communicator_pt()->nproc();
02395 
02396      // if both distributions are distributed
02397      // =====================================
02398      if (this->distributed() && dist_pt->distributed())
02399       {
02400 
02401        // new nrow_local and first_row data
02402        Vector<unsigned> new_first_row(nproc);
02403        Vector<unsigned> new_nrow_local(nproc);
02404        Vector<unsigned> current_first_row(nproc);
02405        Vector<unsigned> current_nrow_local(nproc);
02406        for (int i = 0; i < nproc; i++)
02407         {
02408          new_first_row[i] = dist_pt->first_row(i);
02409          new_nrow_local[i] = dist_pt->nrow_local(i);
02410          current_first_row[i] = this->first_row(i);
02411          current_nrow_local[i] = this->nrow_local(i);
02412         }
02413        
02414        // compute which local rows are expected to be received from each 
02415        // processor / sent to each processor
02416        Vector<unsigned> first_row_for_proc(nproc,0);
02417        Vector<unsigned> nrow_local_for_proc(nproc,0);
02418        Vector<unsigned> first_row_from_proc(nproc,0);
02419        Vector<unsigned> nrow_local_from_proc(nproc,0);
02420 
02421        // for every processor compute first_row and nrow_local that will
02422        // will sent and received by this processor
02423        for (int p = 0; p < nproc; p++)
02424         {
02425          // start with data to be sent
02426          if ((new_first_row[p] < (current_first_row[my_rank] +
02427                                        current_nrow_local[my_rank])) &&
02428              (current_first_row[my_rank] < (new_first_row[p] +
02429                                                  new_nrow_local[p])))
02430          {
02431           first_row_for_proc[p] = 
02432            std::max(current_first_row[my_rank],
02433                     new_first_row[p]);
02434           nrow_local_for_proc[p] = 
02435            std::min((current_first_row[my_rank] +
02436                      current_nrow_local[my_rank]),
02437                     (new_first_row[p] +
02438                      new_nrow_local[p])) - first_row_for_proc[p];
02439          }
02440          
02441          // and data to be received
02442          if ((new_first_row[my_rank] < (current_first_row[p] +
02443                                                  current_nrow_local[p])) 
02444              && (current_first_row[p] < (new_first_row[my_rank] +
02445                                               new_nrow_local[my_rank])))
02446          {
02447           first_row_from_proc[p] = 
02448            std::max(current_first_row[p],
02449                     new_first_row[my_rank]);
02450           nrow_local_from_proc[p] = 
02451            std::min((current_first_row[p] +
02452                      current_nrow_local[p]),
02453                     (new_first_row[my_rank] +
02454                      new_nrow_local[my_rank]))-first_row_from_proc[p];
02455          }         
02456         }
02457 
02458        // determine how many nnzs to send to each processor
02459        Vector<unsigned> nnz_for_proc(nproc,0);
02460        for (int p = 0; p < nproc; p++)
02461         {
02462          if (nrow_local_for_proc[p] > 0)
02463           {
02464            nnz_for_proc[p] = (current_row_start[first_row_for_proc[p]-
02465                                                 current_first_row[my_rank]+
02466                                                 nrow_local_for_proc[p]]-
02467                               current_row_start[first_row_for_proc[p]-
02468                                                 current_first_row[my_rank]]);
02469           }
02470         }
02471 
02472        // next post non-blocking sends and recv for the nnzs
02473        Vector<unsigned> nnz_from_proc(nproc,0);
02474        Vector<MPI_Request> send_req;
02475        Vector<MPI_Request> nnz_recv_req;
02476        for (int p = 0; p < nproc; p++)
02477         {
02478          if (p != my_rank)
02479           {
02480            // send
02481            if (nrow_local_for_proc[p] > 0)
02482             {
02483              MPI_Request req;
02484              MPI_Isend(&nnz_for_proc[p],1,MPI_UNSIGNED,p,0,
02485                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02486              send_req.push_back(req);
02487             }
02488            
02489            // recv
02490            if (nrow_local_from_proc[p] > 0)
02491             {
02492              MPI_Request req;
02493              MPI_Irecv(&nnz_from_proc[p],1,MPI_UNSIGNED,p,0,
02494                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02495              nnz_recv_req.push_back(req);
02496             }
02497           }
02498          // "send to self"
02499          else
02500           {
02501            nnz_from_proc[p] = nnz_for_proc[p];
02502           }
02503         }
02504 
02505        // allocate new storage for the new row_start
02506        int* new_row_start = new int[new_nrow_local[my_rank]+1];
02507 
02508        // wait for recvs to complete
02509        unsigned n_recv_req = nnz_recv_req.size();
02510        if (n_recv_req > 0)
02511         {
02512          Vector<MPI_Status> recv_status(n_recv_req);
02513          MPI_Waitall(n_recv_req,&nnz_recv_req[0],&recv_status[0]);
02514         }
02515        
02516        // compute the nnz offset for each processor
02517        unsigned next_row = 0;
02518        unsigned nnz_count = 0;
02519        Vector<unsigned> nnz_offset(nproc,0);
02520        for (int p = 0; p < nproc; p++)
02521         {
02522          unsigned pp = 0;
02523          while (new_first_row[pp] != next_row) { pp++; }
02524          nnz_offset[pp] = nnz_count;
02525          nnz_count+= nnz_from_proc[pp];
02526          next_row += new_nrow_local[pp];
02527         }
02528 
02529        // allocate storage for the values and column indices
02530        int* new_column_index = new int[nnz_count];
02531        double* new_value = new double[nnz_count];
02532 
02533        // post the sends and recvs for the matrix data
02534        Vector<MPI_Request> recv_req;
02535        MPI_Aint base_address;
02536        MPI_Address(new_value,&base_address);
02537        for (int p = 0; p < nproc; p++)
02538         {
02539          // communicated with other processors
02540          if (p != my_rank)
02541           {
02542            // SEND
02543            if (nrow_local_for_proc[p] > 0)
02544             {
02545              // array of datatypes
02546              MPI_Datatype types[3];
02547 
02548              // array of offsets
02549              MPI_Aint offsets[3];
02550 
02551              // array of lengths
02552              int len[3];
02553 
02554              // row start
02555              unsigned first_row_to_send = first_row_for_proc[p] - 
02556               current_first_row[my_rank];
02557              MPI_Type_contiguous(nrow_local_for_proc[p],MPI_INT,
02558                                  &types[0]);
02559              MPI_Type_commit(&types[0]);
02560              len[0] = 1;
02561              MPI_Address(current_row_start+first_row_to_send,&offsets[0]);
02562              offsets[0] -= base_address;
02563 
02564              // values
02565              unsigned first_coef_to_send 
02566               = current_row_start[first_row_to_send];             
02567              MPI_Type_contiguous(nnz_for_proc[p],MPI_DOUBLE,
02568                                  &types[1]);
02569              MPI_Type_commit(&types[1]);
02570              len[1] = 1;
02571              MPI_Address(current_value+first_coef_to_send,&offsets[1]);
02572              offsets[1] -= base_address;
02573 
02574              // column index
02575              MPI_Type_contiguous(nnz_for_proc[p],MPI_DOUBLE,
02576                                  &types[2]);
02577              MPI_Type_commit(&types[2]);
02578              len[2] = 1;
02579              MPI_Address(current_column_index+first_coef_to_send,&offsets[2]);
02580              offsets[2] -= base_address;
02581 
02582              // build the combined datatype
02583              MPI_Datatype send_type;
02584              MPI_Type_struct(3,len,offsets,types,&send_type);
02585              MPI_Type_commit(&send_type);
02586              MPI_Type_free(&types[0]);
02587              MPI_Type_free(&types[1]);
02588              MPI_Type_free(&types[2]);
02589 
02590              // and send
02591              MPI_Request req;
02592              MPI_Isend(new_value,1,send_type,p,1,
02593                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02594              send_req.push_back(req);
02595              MPI_Type_free(&send_type);
02596             }
02597 
02598            // RECV
02599            if (nrow_local_from_proc[p] > 0)
02600             {
02601              // array of datatypes
02602              MPI_Datatype types[3];
02603 
02604              // array of offsets
02605              MPI_Aint offsets[3];
02606 
02607              // array of lengths
02608              int len[3];
02609 
02610              // row start
02611              unsigned first_row_to_recv = first_row_from_proc[p] - 
02612               new_first_row[my_rank];
02613              MPI_Type_contiguous(nrow_local_from_proc[p],MPI_INT,
02614                                  &types[0]);
02615              MPI_Type_commit(&types[0]);
02616              len[0] = 1;
02617              MPI_Address(new_row_start+first_row_to_recv,&offsets[0]);
02618              offsets[0] -= base_address;
02619 
02620              // values
02621              unsigned first_coef_to_recv = nnz_offset[p];
02622              MPI_Type_contiguous(nnz_from_proc[p],MPI_DOUBLE,
02623                                  &types[1]);
02624              MPI_Type_commit(&types[1]);
02625              len[1] = 1;
02626              MPI_Address(new_value+first_coef_to_recv,&offsets[1]);
02627              offsets[1] -= base_address;
02628 
02629              // column index
02630              MPI_Type_contiguous(nnz_from_proc[p],MPI_INT,
02631                                  &types[2]);
02632              MPI_Type_commit(&types[2]);
02633              len[2] = 1;
02634              MPI_Address(new_column_index+first_coef_to_recv,&offsets[2]);
02635              offsets[2] -= base_address;
02636 
02637              // build the combined datatype
02638              MPI_Datatype recv_type;
02639              MPI_Type_struct(3,len,offsets,types,&recv_type);
02640              MPI_Type_commit(&recv_type);
02641              MPI_Type_free(&types[0]);
02642              MPI_Type_free(&types[1]);
02643              MPI_Type_free(&types[2]);
02644 
02645              // and send
02646              MPI_Request req;
02647              MPI_Irecv(new_value,1,recv_type,p,1,
02648                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02649              recv_req.push_back(req);
02650              MPI_Type_free(&recv_type);
02651             }
02652           }
02653          // other wise transfer data internally 
02654          else 
02655           {
02656            unsigned j = first_row_for_proc[my_rank] - 
02657             current_first_row[my_rank];
02658            unsigned k = first_row_from_proc[my_rank] - 
02659             new_first_row[my_rank];
02660            for (unsigned i = 0; i < nrow_local_for_proc[my_rank]; i++)
02661             {
02662              new_row_start[k+i] = current_row_start[j+i];
02663             }
02664            unsigned first_coef_to_send = current_row_start[j];
02665            for (unsigned i = 0; i < nnz_for_proc[my_rank]; i++)
02666             {
02667              new_value[nnz_offset[p]+i]=current_value[first_coef_to_send+i];
02668             new_column_index[nnz_offset[p]+i]
02669              =current_column_index[first_coef_to_send+i];
02670             }
02671           }
02672         }
02673 
02674        // wait for all recvs to complete
02675        n_recv_req = recv_req.size();
02676        if (n_recv_req > 0)
02677         {
02678          Vector<MPI_Status> recv_status(n_recv_req);
02679          MPI_Waitall(n_recv_req,&recv_req[0],&recv_status[0]);
02680         }
02681 
02682        // next we need to update the row starts
02683        for (int p = 0; p < nproc; p++)
02684         {
02685          if (nrow_local_from_proc[p] > 0)
02686           {
02687            unsigned first_row = first_row_from_proc[p]-new_first_row[my_rank];
02688            unsigned last_row = first_row + nrow_local_from_proc[p]-1;
02689            int update = nnz_offset[p] - new_row_start[first_row];
02690             for (unsigned i = first_row; i <= last_row; i++)
02691             {
02692              new_row_start[i]+=update;
02693             }
02694           }
02695         }
02696        new_row_start[dist_pt->nrow_local()] = nnz_count;
02697 
02698        // wait for sends to complete
02699        unsigned n_send_req = send_req.size();
02700        if (n_recv_req > 0)
02701         {
02702          Vector<MPI_Status> send_status(n_recv_req);
02703          MPI_Waitall(n_send_req,&send_req[0],&send_status[0]);
02704         }
02705        if (my_rank == 0)
02706         {
02707          CRDoubleMatrix* m_pt = this->return_global_matrix();
02708          m_pt->sparse_indexed_output("m1.dat");
02709         }
02710 
02711        // 
02712        this->build(dist_pt);
02713        this->build_matrix_without_copy(dist_pt->nrow(),nnz_count,
02714                                        new_value,new_column_index,
02715                                        new_row_start);
02716        if (my_rank == 0)
02717         {
02718          CRDoubleMatrix* m_pt = this->return_global_matrix();
02719          m_pt->sparse_indexed_output("m2.dat");
02720         }
02721 //       this->sparse_indexed_output(oomph_info);
02722        abort();
02723       }
02724      
02725      // if this matrix is distributed but the new distributed matrix is global
02726      // ======================================================================
02727      else if (this->distributed() && !dist_pt->distributed())
02728       {
02729 
02730        // nnz 
02731        int nnz = this->nnz();
02732        
02733        // nrow global
02734        unsigned nrow = this->nrow();
02735        
02736        // cache nproc
02737        int nproc = Distribution_pt->communicator_pt()->nproc();
02738        
02739        // get the nnzs on the other processors
02740        int* dist_nnz_pt = new int[nproc];
02741        MPI_Allgather(&nnz,1,MPI_INT,
02742                      dist_nnz_pt,1,MPI_INT,
02743                      Distribution_pt->communicator_pt()->mpi_comm());
02744        
02745        // create an int array of first rows and nrow local and 
02746        // compute nnz global
02747        int* dist_first_row = new int[nproc];
02748        int* dist_nrow_local =  new int[nproc];
02749        for (int p = 0; p < nproc; p++)
02750         {
02751          dist_first_row[p] = this->first_row(p);
02752          dist_nrow_local[p] = this->nrow_local(p);
02753         }
02754        
02755        // conpute the offset for the values and column index data
02756        // compute the nnz offset for each processor
02757        int next_row = 0;
02758        unsigned nnz_count = 0;
02759        Vector<unsigned> nnz_offset(nproc,0);
02760        for (int p = 0; p < nproc; p++)
02761         {
02762          unsigned pp = 0;
02763          while (dist_first_row[pp] != next_row) { pp++; }
02764          nnz_offset[pp] = nnz_count;
02765          nnz_count+=dist_nnz_pt[pp];
02766          next_row+=dist_nrow_local[pp];
02767         }
02768        
02769        // get pointers to the (current) distributed data
02770        int* dist_row_start = this->row_start();
02771        int* dist_column_index = this->column_index();
02772        double* dist_value = this->value();
02773        
02774        // space for the global matrix
02775        int* global_row_start = new int[nrow+1];
02776        int* global_column_index = new int[nnz_count];
02777        double* global_value = new double[nnz_count];
02778 
02779        // post the sends and recvs for the matrix data
02780        Vector<MPI_Request> recv_req;
02781        Vector<MPI_Request> send_req;
02782        MPI_Aint base_address;
02783        MPI_Address(global_value,&base_address);
02784 
02785        // SEND
02786        if (dist_nrow_local[my_rank] > 0)
02787         {
02788          // types
02789          MPI_Datatype types[3];
02790          
02791          // offsets
02792          MPI_Aint offsets[3];
02793 
02794          // lengths
02795          int len[3];
02796 
02797          // row start
02798          MPI_Type_contiguous(dist_nrow_local[my_rank],MPI_INT,&types[0]);
02799          MPI_Type_commit(&types[0]);
02800          MPI_Address(dist_row_start,&offsets[0]);
02801          offsets[0] -= base_address;
02802          len[0] = 1;
02803 
02804          // value
02805          MPI_Type_contiguous(nnz,MPI_DOUBLE,&types[1]);
02806          MPI_Type_commit(&types[1]);
02807          MPI_Address(dist_value,&offsets[1]);
02808          offsets[1] -= base_address;
02809          len[1] = 1;
02810 
02811          // column indices
02812          MPI_Type_contiguous(nnz,MPI_INT,&types[2]);
02813          MPI_Type_commit(&types[2]);
02814          MPI_Address(dist_column_index,&offsets[2]);
02815          offsets[2] -= base_address;
02816          len[2] = 1;
02817 
02818          // build the send type
02819          MPI_Datatype send_type;
02820          MPI_Type_struct(3,len,offsets,types,&send_type);
02821          MPI_Type_commit(&send_type);
02822          MPI_Type_free(&types[0]);
02823          MPI_Type_free(&types[1]);
02824          MPI_Type_free(&types[2]);
02825          
02826          // and send 
02827          for (int p = 0; p < nproc; p++)
02828           {
02829            if (p != my_rank)
02830             {
02831              MPI_Request req;
02832              MPI_Isend(global_value,1,send_type,p,1,
02833                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02834              send_req.push_back(req);
02835             }
02836           }
02837          MPI_Type_free(&send_type);
02838         }
02839       
02840        // RECV
02841        for (int p = 0; p < nproc; p++)
02842         {
02843          // communicated with other processors
02844          if (p != my_rank)
02845           {
02846            // RECV
02847            if (dist_nrow_local[p] > 0)
02848             {
02849 
02850              // types
02851              MPI_Datatype types[3];
02852              
02853              // offsets
02854              MPI_Aint offsets[3];
02855              
02856              // lengths
02857              int len[3];
02858              
02859              // row start
02860              MPI_Type_contiguous(dist_nrow_local[p],MPI_INT,&types[0]);
02861              MPI_Type_commit(&types[0]);
02862              MPI_Address(global_row_start+dist_first_row[p],&offsets[0]);
02863              offsets[0] -= base_address;
02864              len[0] = 1;
02865              
02866              // value
02867              MPI_Type_contiguous(dist_nnz_pt[p],MPI_DOUBLE,&types[1]);
02868              MPI_Type_commit(&types[1]);
02869              MPI_Address(global_value+nnz_offset[p],&offsets[1]);
02870              offsets[1] -= base_address;
02871              len[1] = 1;
02872              
02873              // column indices
02874              MPI_Type_contiguous(dist_nnz_pt[p],MPI_INT,&types[2]);
02875              MPI_Type_commit(&types[2]);
02876              MPI_Address(global_column_index+nnz_offset[p],&offsets[2]);
02877              offsets[2] -= base_address;
02878              len[2] = 1;
02879 
02880              // build the send type
02881              MPI_Datatype recv_type;
02882              MPI_Type_struct(3,len,offsets,types,&recv_type);
02883              MPI_Type_commit(&recv_type);
02884              MPI_Type_free(&types[0]);
02885              MPI_Type_free(&types[1]);
02886              MPI_Type_free(&types[2]);
02887          
02888              // and send 
02889              MPI_Request req;
02890              MPI_Irecv(global_value,1,recv_type,p,1,
02891                        Distribution_pt->communicator_pt()->mpi_comm(),&req);
02892              recv_req.push_back(req);
02893             }
02894           }
02895          // otherwise send to self
02896          else
02897           {
02898            unsigned nrow_local = dist_nrow_local[my_rank];
02899            unsigned first_row = dist_first_row[my_rank];
02900            for (unsigned i = 0; i < nrow_local; i++)
02901             {
02902              global_row_start[first_row+i]=dist_row_start[i];
02903             }
02904            unsigned offset = nnz_offset[my_rank];
02905            for (int i = 0; i < nnz; i++)
02906             {
02907              global_value[offset+i]=dist_value[i];
02908              global_column_index[offset+i]=dist_column_index[i];
02909             }
02910           }
02911         }
02912          
02913        // wait for all recvs to complete
02914        unsigned n_recv_req = recv_req.size();
02915        if (n_recv_req > 0)
02916         {
02917          Vector<MPI_Status> recv_status(n_recv_req);
02918          MPI_Waitall(n_recv_req,&recv_req[0],&recv_status[0]);
02919         }
02920 
02921        // finally the last row start
02922        global_row_start[nrow] = nnz_count;
02923        
02924        // update the other row start
02925        for (int p = 0; p < nproc; p++)
02926         {
02927          for (int i = 0; i < dist_nrow_local[p]; i++)
02928           {
02929            unsigned j = dist_first_row[p] + i;
02930            global_row_start[j]+=nnz_offset[p];
02931           }
02932         }
02933        
02934        // wait for sends to complete
02935        unsigned n_send_req = send_req.size();
02936        if (n_recv_req > 0)
02937         {
02938          Vector<MPI_Status> send_status(n_recv_req);
02939          MPI_Waitall(n_send_req,&send_req[0],&send_status[0]);
02940         }
02941 
02942        // rebuild the matrix
02943        LinearAlgebraDistribution* dist_pt = new 
02944         LinearAlgebraDistribution(Distribution_pt->communicator_pt(),
02945                                   nrow,false);
02946        this->build(dist_pt);
02947        this->build_matrix_without_copy(dist_pt->nrow(),nnz_count,
02948                                        global_value,global_column_index,
02949                                        global_row_start);
02950 
02951        // clean up
02952        delete dist_first_row;
02953        delete dist_nrow_local;
02954        delete dist_nnz_pt;
02955       }
02956 
02957      // other the matrix is not distributed but it needs to be turned 
02958      // into a distributed matrix
02959      // =============================================================
02960      else if (!this->distributed() && dist_pt->distributed())
02961       {   
02962 
02963        // cache the new nrow_local
02964        unsigned nrow_local = dist_pt->nrow_local();
02965        
02966        // and first_row
02967        unsigned first_row = dist_pt->first_row();
02968 
02969        // get pointers to the (current) distributed data
02970        int* global_row_start = this->row_start();
02971        int* global_column_index = this->column_index();
02972        double* global_value = this->value();
02973 
02974        // determine the number of non zeros required by this processor
02975        unsigned nnz = global_row_start[first_row+nrow_local] - 
02976         global_row_start[first_row];
02977 
02978        // allocate
02979        int* dist_row_start = new int[nrow_local+1];
02980        int* dist_column_index = new int[nnz];
02981        double* dist_value = new double[nnz];
02982        
02983        // copy
02984        int offset = global_row_start[first_row];
02985        for (unsigned i = 0; i <= nrow_local; i++)
02986         {
02987          dist_row_start[i] = global_row_start[first_row+1]-offset;
02988         }
02989        for (unsigned i = 0; i < nnz; i++)
02990         {
02991          dist_column_index[i] = global_column_index[offset+i];
02992          dist_value[i] = global_value[offset+i];
02993         }
02994 
02995        // rebuild
02996        this->build(dist_pt);
02997        this->build_matrix_without_copy(dist_pt->nrow(),nnz,dist_value,
02998                                        dist_column_index,dist_row_start);
02999       }
03000     }
03001 #endif
03002  }
03003 }

Generated on Mon Aug 10 11:23:47 2009 by  doxygen 1.4.7