block_preconditioner.cc
Go to the documentation of this file.
1 //LIC// ====================================================================
2 //LIC// This file forms part of oomph-lib, the object-oriented,
3 //LIC// multi-physics finite-element library, available
4 //LIC// at http://www.oomph-lib.org.
5 //LIC//
6 //LIC// Version 1.0; svn revision $LastChangedRevision: 1282 $
7 //LIC//
8 //LIC// $LastChangedDate: 2017-01-16 08:27:53 +0000 (Mon, 16 Jan 2017) $
9 //LIC//
10 //LIC// Copyright (C) 2006-2016 Matthias Heil and Andrew Hazel
11 //LIC//
12 //LIC// This library is free software; you can redistribute it and/or
13 //LIC// modify it under the terms of the GNU Lesser General Public
14 //LIC// License as published by the Free Software Foundation; either
15 //LIC// version 2.1 of the License, or (at your option) any later version.
16 //LIC//
17 //LIC// This library is distributed in the hope that it will be useful,
18 //LIC// but WITHOUT ANY WARRANTY; without even the implied warranty of
19 //LIC// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 //LIC// Lesser General Public License for more details.
21 //LIC//
22 //LIC// You should have received a copy of the GNU Lesser General Public
23 //LIC// License along with this library; if not, write to the Free Software
24 //LIC// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
25 //LIC// 02110-1301 USA.
26 //LIC//
27 //LIC// The authors may be contacted at oomph-lib@maths.man.ac.uk.
28 //LIC//
29 //LIC//====================================================================
30 #include "block_preconditioner.h"
31 
32 namespace oomph
33 {
34 
35  /// \short Static boolean to allow block_matrix_test(...) to be run.
36  /// Defaults to false.
37  template<typename MATRIX>
38  bool BlockPreconditioner<MATRIX>::Run_block_matrix_test=false;
39 
40 
41 
42  //============================================================================
43  /// Determine the size of the matrix blocks and setup the
44  /// lookup schemes relating the global degrees of freedom with
45  /// their "blocks" and their indices (row/column numbers) in those
46  /// blocks.
47  /// The distributions of the preconditioner and the blocks are
48  /// automatically specified (and assumed to be uniform) at this
49  /// stage.
50  /// This method should be used if any block contains more than one
51  /// type of DOF. The argument vector dof_to_block_map should be of length
52  /// ndof. Each element should contain an integer indicating the block number
53  /// corresponding to that type of DOF.
54  //============================================================================
55  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
56  block_setup(const Vector<unsigned>& dof_to_block_map_in)
57  {
58 
59 
60 #ifdef PARANOID
61  // Subsidiary preconditioners don't really need the meshes
62  if (this->is_master_block_preconditioner())
63  {
64  std::ostringstream err_msg;
65  unsigned n=nmesh();
66  if (n==0)
67  {
68  err_msg << "No meshes have been set for this block preconditioner!\n"
69  << "Set one with set_nmesh(...), set_mesh(...)" << std::endl;
70  throw OomphLibError(err_msg.str(),
71  OOMPH_CURRENT_FUNCTION,
72  OOMPH_EXCEPTION_LOCATION);
73  for (unsigned m=0;m<n;m++)
74  {
75  if (Mesh_pt[m]==0)
76  {
77  err_msg << "The mesh pointer to mesh " << m << " is null!\n"
78  << "Set a non-null one with set_mesh(...)" << std::endl;
79  throw OomphLibError(err_msg.str(),
80  OOMPH_CURRENT_FUNCTION,
81  OOMPH_EXCEPTION_LOCATION);
82 
83  }
84  }
85  }
86  }
87 #endif
88 
89  // Create a copy of the vector input so that we can modify it below
90  Vector<unsigned> dof_to_block_map = dof_to_block_map_in;
91 
92  if(is_subsidiary_block_preconditioner())
93  {
94 #ifdef PARANOID
95  // Get the size of the Doftype_in_master_preconditioner_coarse.
96  unsigned para_doftype_in_master_preconditioner_coarse_size
97  = Doftype_in_master_preconditioner_coarse.size();
98 
99  // Check that the Doftype_in_master_preconditioner_coarse vector is not
100  // empty. This must be set (via the function
101  // turn_into_subsidiary_block_preconditioner) if this is a
102  // subsidiary block preconditioner.
103  if(para_doftype_in_master_preconditioner_coarse_size == 0)
104  {
105  std::ostringstream err_msg;
106  err_msg << "The mapping from the dof types of the master "
107  << "block preconditioner \n"
108  << "to the subsidiary block preconditioner is empty.\n"
109  << "Doftype_in_master_preconditioner_coarse.size() == 0 \n"
110  << "has turn_into_subsidiary_block_preconditioner(...)\n"
111  << "been called with the correct parameters?\n"
112  << std::endl;
113  throw OomphLibError(err_msg.str(),
114  OOMPH_CURRENT_FUNCTION,
115  OOMPH_EXCEPTION_LOCATION);
116  }
117 
118 
119  // PARANOID checks for Doftype_coarsen_map_coarse
120  // This is also set in the function
121  // turn_into_subsidiary_block_preconditioner(...).
122  //
123  // The Doftype_coarsen_map_coarse vector must satisfy two conditions
124  // for it to be valid.
125  //
126  // 1) The dof type numbers in the dof_coarsen_map vector must be
127  // unique. For example, it does not make sense to have the vector
128  // [[0,1][1,2]] because the first inner vector says
129  // "treat dof types 0 and 1 as dof type 0" and the second inner vector
130  // says "treat dof type 1 and 2 as dof type 1", but dof type 1 is already
131  // being treated as dof type 0.
132  //
133  // 2) Every SUBSIDIARY dof type must be mapped to a dof type in the
134  // Doftype_coarsen_map_coarse vector.
135  // For example, if there are 5 dof types (passed down from the master
136  // block preconditioner), and this block subsidiary block preconditioner
137  // only deals with 3 dof types, then all 5 dof types must be mapped to a
138  // dof type in the subsidiary preconditioner. For example if the dof_map
139  // is [1,2,3,4,5], then the subsidiary block preconditioner knows that 5
140  // dof types have been passed down. But if it only works with three dof
141  // types, we MUST have three inner vectors in the doftype_coarsen_map
142  // vector (which corresponds to dof types 0, 1 and 2), the union of the
143  // dof types in the three inner vectors must contain dof types 0, 1, 2,
144  // 3 and 4 exactly once. It cannot contain, say, 0, 1, 5, 7, 9, even
145  // though it passes the uniqueness check. We ensure this by two
146  // conditions:
147  //
148  // 2.1) The Doftype_coarsen_map_coarse vector must contain the same
149  // number of dof types as the dof_map vector.
150  // In other words, recall that Doftype_coarsen_map_coarse is a
151  // 2D vector, this must contain the same number of vectors as
152  // there are elements in the dof_to_block_map_in vector.
153  //
154  // 2.2) The maximum element in the doftype_coarsen_map_coarse vector
155  // is the length of the dof_map vector minus 1.
156 
157  // A set is deal for checking the above three conditions, we shall insert
158  // all the elements in the doftype_coarsen_map_coarse vector into this set.
159  std::set<unsigned> doftype_map_set;
160 
161  // Condition (1): Check for uniqueness by inserting all the values of
162  // Doftype_coarsen_map_coarse into a set.
163  unsigned para_doftype_coarsen_map_coarse_size
164  = Doftype_coarsen_map_coarse.size();
165 
166  // Loop through the outer vector of Doftype_coarsen_map_coarse
167  // then loop through the inner vectors and attempt to insert each
168  // element of Doftype_coarsen_map_coarse into doftype_map_set.
169  //
170  // The inner for loop will throw an error if we cannot insert the
171  // element, this means that it is already inserted and thus not unique.
172  for (unsigned i = 0; i < para_doftype_coarsen_map_coarse_size; i++)
173  {
174  // Loop through the inner vector
175  unsigned para_doftype_coarsen_map_coarse_i_size
176  = Doftype_coarsen_map_coarse[i].size();
177  for (unsigned j = 0; j < para_doftype_coarsen_map_coarse_i_size; j++)
178  {
179  // Attempt to insert all the values of the inner vector into a set.
180  std::pair<std::set<unsigned>::iterator,bool> doftype_map_ret
181  = doftype_map_set.insert(Doftype_coarsen_map_coarse[i][j]);
182 
183  if(!doftype_map_ret.second)
184  {
185  std::ostringstream err_msg;
186  err_msg << "Error: the doftype number "
187  << Doftype_coarsen_map_coarse[i][j]
188  << " is already inserted."
189  << std::endl;
190  throw OomphLibError(err_msg.str(),
191  OOMPH_CURRENT_FUNCTION,
192  OOMPH_EXCEPTION_LOCATION);
193  }
194  }
195  }
196 
197  // Condition (2.1): Check that the doftype_map_set describes as many values
198  // as doftype_in_master_preconditioner_coarse.
199  // I.e. if dof_map contains 5 dof types, then the
200  // doftype_coarsen_map_coarse vector must also contain 5 dof types.
201  if(para_doftype_in_master_preconditioner_coarse_size
202  != doftype_map_set.size())
203  {
204  std::ostringstream err_msg;
205  err_msg << "The size of doftype_in_master_preconditioner_coarse "
206  << "must be the same as the total\n"
207  << "number of values in the doftype_coarsen_map_coarse vector."
208  << std::endl;
209  throw OomphLibError(err_msg.str(),
210  OOMPH_CURRENT_FUNCTION,
211  OOMPH_EXCEPTION_LOCATION);
212  }
213 
214  // Condition (2.2): Check that the maximum element in the
215  // doftype_coarsen_map_coarse vector is the length of the
216  // doftype_in_master_preconditioner_coarse minus 1.
217  unsigned para_doftype_in_master_preconditioner_coarse_size_minus_one
218  = para_doftype_in_master_preconditioner_coarse_size - 1;
219  if(para_doftype_in_master_preconditioner_coarse_size_minus_one
220  != *doftype_map_set.rbegin())
221  {
222  std::ostringstream err_msg;
223  err_msg << "The maximum dof type number in the "
224  << "doftype_coarsen_map vector must be "
225  << para_doftype_in_master_preconditioner_coarse_size_minus_one
226  << std::endl;
227  throw OomphLibError(err_msg.str(),
228  OOMPH_CURRENT_FUNCTION,
229  OOMPH_EXCEPTION_LOCATION);
230  }
231 #endif
232 
233  // Set the mapping from the master preconditioner DOF types to the
234  // subsidiary preconditioner DOF types.
235  //
236  // IMPORTANT: Since DOF types may be coarsened in the master block
237  // preconditioner, this may no longer reflect the actual underlying dof
238  // types. We must get the actual underlying dof types for the
239  // block_setup(...) function to work properly so all the look up schemes
240  // for this (subsidiary) block preconditioner is correct and works
241  // properly, this is for backwards compatibility purposes and to make sure
242  // Richard Muddle's still works at this (subsidiary) level, although it may
243  // not be used.
244  //
245  // If we do not want to make it backwards compatible, we may as well
246  // kill the block_setup(...) for subsidiary block preconditioners -
247  // but other thing may break. Do it at your own risk (take time to
248  // fully understand the whole block preconditioning framework code).
249 
250  // Create the corresponding Doftype_in_master_preconditioner_fine and
251  // Doftype_coarsen_map_fine vectors.
252 
253  // First resize the vectors.
254  Doftype_in_master_preconditioner_fine.resize(0);
255  Doftype_coarsen_map_fine.resize(0);
256 
257  // The Doftype_in_master_preconditioner_fine vector is easy. We know that
258  // the Doftype_coarsen_map_fine in the master preconditioner must be
259  // constructed already. So we simply loop through the values in
260  // doftype_in_master_preconditioner_coarse, then get the most fine grain
261  // dof types from the master preconditioner's Doftype_coarsen_map_fine
262  // vector.
263  //
264  // For example, if the master preconditioner has the vector:
265  // Doftype_coarsen_map_fine = [0,1,2,3][4,5,6,7][8,9,10,11][12,13][14,15]
266  //
267  // and passes the two vectors
268  // doftype_in_master_preconditioner_coarse = [1,2,3]
269  // doftype_coarsen_map_coarse = [[0][1,2]]
270  //
271  // Then we want
272  // Doftype_in_master_preconditioner_fine = [4,5,6,7,8,9,10,11,12,13]
273  //
274  // We achieve this by looking up the corresponding fine dof types in the
275  // masters' Doftype_coarsen_map_fine vector which corresponds to the
276  // values in Doftype_in_master_preconditioner_coarse.
277  //
278  // That is, the values in Doftype_in_master_preconditioner_coarse gives us
279  // the index of sub vector we want in the master's Doftype_coarsen_map_fine
280  // vector.
281 
282 #ifdef PARANOID
283  // Check that the master block preconditioner's Doftype_coarsen_map_fine is
284  // set up. Under the current implementation, this would always be set up
285  // properly, but we check it just in case!
286  if(master_block_preconditioner_pt()->doftype_coarsen_map_fine().size() == 0)
287  {
288  std::ostringstream err_msg;
289  err_msg << "The master block preconditioner's "
290  << "Doftype_coarsen_map_fine is not\n"
291  << "set up properly.\n"
292  << "\n"
293  << "This vector is constructed in the function "
294  << "block_setup(...).\n"
295  << std::endl;
296  throw OomphLibError(err_msg.str(),
297  OOMPH_CURRENT_FUNCTION,
298  OOMPH_EXCEPTION_LOCATION);
299  }
300 #endif
301 
302  unsigned doftype_in_master_preconditioner_coarse_size
303  = Doftype_in_master_preconditioner_coarse.size();
304  for (unsigned i = 0;
305  i < doftype_in_master_preconditioner_coarse_size; i++)
306  {
307  // The index of the sub vector we want.
308  unsigned subvec_index = Doftype_in_master_preconditioner_coarse[i];
309 
310  // Get the corresponding most fine grain sub vector from the master block
311  // preconditioner
312  Vector<unsigned> tmp_master_dof_subvec
313  = Master_block_preconditioner_pt
314  ->get_fine_grain_dof_types_in(subvec_index);
315 
316  Doftype_in_master_preconditioner_fine.insert(
317  Doftype_in_master_preconditioner_fine.end(),
318  tmp_master_dof_subvec.begin(),
319  tmp_master_dof_subvec.end());
320  }
321 
322  // The Doftype_coarsen_map_fine vector is a bit more tricky.
323  // The Doftype_coarsen_map_coarse vector describes which coarse dof types
324  // of THIS preconditioner are grouped together. We have to translate this
325  // into the most fine grain dof types.
326  //
327  // For example, if
328  // Doftype_coarsen_map_coarse = [[0][1,2]]
329  // Doftype_in_master_preconditioner_coarse = [1,2,3]
330  //
331  // and the MASTER preconditioner has:
332  // Doftype_coarsen_map_fine= [[0,1,2,3][4,5,6,7][8,9,10,11][12,13][14,15]]
333  //
334  // Then [[0][1,2]] tell us that the most fine grain DOF types 1 of the
335  // master preconditioner most be grouped together, and the most fine
336  // grained dof types 2 and 3 of the master preconditioner must be grouped
337  // together.
338  //
339  // This gives the vector [[4,5,6,7] [8,9,10,11,12,13]], translating this
340  // into the local DOF types of this preconditioner we have
341  // Doftype_coarsen_map_fine = [[0,1,2,3][4,5,6,7,8,9]]. This corresponds
342  // with the Doftype_in_master_preconditioner_fine vector we created above:
343  // Doftype_in_master_preconditioner_fine = [4,5,6,7,8,9,10,11,12,13]
344  //
345  // Together, the master block preconditioner says to THIS subsidiary block
346  // preconditioner "work on my DOF types [4,5,6,7,8,9,10,11,12,13], but group
347  // your DOF type [0,1,2,3] together as DOF type 0 and [4,5,6,7,8,9] together
348  // together as DOF type 1".
349  //
350  // Think of it like this: For each DOF type in Doftype_coarsen_map_coarse
351  // we look at how many values this corresponds to in the master
352  // preconditioner. In this case, Doftype_coarsen_map_coarse:
353  //
354  // 1 - corresponds to fine DOF types 0,1,2,3 in this preconditioner,
355  // and 4,5,6,7 in the master preconditioner;
356  //
357  // 2 - corresponds to fine DOF types 4,5,6,7 in this preconditioner,
358  // and 8,9,10,11 in the master preconditioner;
359  //
360  // 3 - corresponds to fine DOF types 8,9 in this preconditioner,
361  // and 12,13 in the master preconditioner.
362  //
363  // Thus Doftype_coarsen_map_fine = [[0,1,2,3][4,5,6,7,8,9]]
364  //
365  ////////////////////////////////////////////////////////////////////////
366  //
367  // How to do this: First we create a 2D vector which has the corresponds
368  // to the fine dof types in the master preconditioner but starting from
369  // 0. For example, take the above example (repeated below):
370  // Passed to this prec by the master prec:
371  // Doftype_coarsen_map_coarse = [[0][1,2]]
372  // Doftype_in_master_preconditioner_coarse = [1,2,3]
373  //
374  // and the MASTER preconditioner has:
375  // Doftype_coarsen_map_fine= [[0,1,2,3][4,5,6,7][8,9,10,11][12,13][14,15]]
376  //
377  // Step 1:
378  // Then, the temp 2D vector we want to create is:
379  // master_fine_doftype_translated = [[0 1 2 3], [4,5,6,7], [8,9]]
380  // This comes from using Doftype_in_master_preconditioner_coarse
381  // then get the number of fine dof types in the master.
382  //
383  // Step 2:
384  // Then:
385  // Loop through the vector Doftype_coarsen_map_coarse,
386  // Loop over the inner vectors in Doftype_coarsen_map_coarse
387  // Each element in the inner vector corresponds to a vector in
388  // master_fine_doftype_translated. We push in the vectors of
389  // master_fine_doftype_translated intp Doftype_coarsen_map_fine
390  //
391 
392  Vector<Vector<unsigned> > master_fine_doftype_translated;
393  unsigned dof_type_index = 0;
394  for (unsigned i = 0;
395  i < doftype_in_master_preconditioner_coarse_size; i++)
396  {
397  // How many fine DOF types are in the master's
398  // Doftype_in_master_preconditioner_coarse[i]?
399  unsigned coarse_dof = Doftype_in_master_preconditioner_coarse[i];
400 
401  unsigned n_master_fine_doftypes
402  = Master_block_preconditioner_pt->nfine_grain_dof_types_in(coarse_dof);
403 
404  Vector<unsigned> tmp_sub_vec;
405  for (unsigned j = 0; j < n_master_fine_doftypes; j++)
406  {
407  tmp_sub_vec.push_back(dof_type_index);
408  dof_type_index++;
409  }
410  master_fine_doftype_translated.push_back(tmp_sub_vec);
411  }
412 
413 
414  // master_fine_doftype_translated now contains vectors with values are from
415  // 0, 1, 2, ..,
416  //
417  // Now read out the values of master_fine_doftype_translated and place them in
418  // order according to Doftype_coarsen_map_coarse.
419  unsigned doftype_coarsen_map_coarse_size
420  = Doftype_coarsen_map_coarse.size();
421  for (unsigned i = 0; i < doftype_coarsen_map_coarse_size; i++)
422  {
423  Vector<unsigned> tmp_vec;
424  unsigned doftype_coarsen_map_coarse_i_size
425  = Doftype_coarsen_map_coarse[i].size();
426  for (unsigned j = 0; j < doftype_coarsen_map_coarse_i_size; j++)
427  {
428  unsigned subvec_i = Doftype_coarsen_map_coarse[i][j];
429 
430  tmp_vec.insert(tmp_vec.end(),
431  master_fine_doftype_translated[subvec_i].begin(),
432  master_fine_doftype_translated[subvec_i].end());
433  }
434 
435  Doftype_coarsen_map_fine.push_back(tmp_vec);
436  }
437 
438  // Get the number of block types (and DOF types) in this preconditioner
439  // from the length of the dof_map vector.
440  Internal_ndof_types = Doftype_in_master_preconditioner_fine.size();
441 
442  // Nblock_types is later updated in block_setup(...)
443  Internal_nblock_types = Internal_ndof_types;
444 
445  // Compute number of rows in this (sub) preconditioner using data from
446  // the master.
447  Nrow = 0;
448  for (unsigned b = 0; b < Internal_ndof_types; b++)
449  {
450  Nrow += this->internal_dof_block_dimension(b);
451  }
452 
453 #ifdef PARANOID
454  if (Nrow==0)
455  {
456  std::ostringstream error_message;
457  error_message
458  << "Nrow=0 in subsidiary preconditioner. This seems fishy and\n"
459  << "suggests that block_setup() was not called for the \n"
460  << "master block preconditioner yet.";
461  throw OomphLibWarning(error_message.str(),
462  OOMPH_CURRENT_FUNCTION,
463  OOMPH_EXCEPTION_LOCATION);
464  }
465 #endif
466  }
467 
468  // If this is a master block preconditioner, then set the
469  // Doftype_coarsen_map_fine and Doftype_coarsen_map_coarse to the
470  // identity. Recall that the Doftype_coarsen_map_fine maps the dof types
471  // that this preconditioner requires with the most fine grain dof types (the
472  // internal dof types) and the Doftype_coarsen_map_coarse maps the dof
473  // types that this preconditioner requires with the dof types which this
474  // preconditioner is given from a master preconditioner (these dof types may
475  // or may not be coarsened). In the case of the master preconditioner, these
476  // are the same (since dof types are not coarsened), furthermore the identity
477  // mapping is provided to say that
478  // dof type 0 maps to dof type 0,
479  // dof type 1 maps to dof type 1,
480  // dof type 2 maps to dof type 2,
481  // etc...
482  //
483  // If this is not a master block preconditioner, then the vectors
484  // Doftype_coarsen_map_fine and Doftype_coarsen_map_coarse is handled
485  // by the turn_into_subsidiary_block_preconditioner(...) function.
486  if(is_master_block_preconditioner())
487  {
488  // How many dof types does this preconditioner work with?
489  unsigned n_external_dof_types = dof_to_block_map.size();
490 
491  // Note: at the master level, the n_external_dof_types should be the same as
492  // the internal_ndof_types(), since the dof_to_block_map MUST describe the
493  // mapping between every dof type (not yet coarsened - so it is the same
494  // number as the internal dof types) to the block types. But we distinguish
495  // them for clarity. We also check that this is the case.
496 #ifdef PARANOID
497  unsigned n_internal_dof_types = internal_ndof_types();
498 
499  if (n_internal_dof_types != n_external_dof_types)
500  {
501  std::ostringstream err_msg;
502  err_msg
503  << "The internal ndof types and the length of the dof_to_block_map\n"
504  << "vector is not the same. Since this is the master block "
505  << "preconditioner,\n"
506  << "you must describe which block each DOF type belongs to,\n"
507  << "no more, no less."
508  << "internal_ndof_types = " << n_internal_dof_types << "\n"
509  << "dof_to_block_map.size() = " << n_external_dof_types << "\n";
510  throw OomphLibWarning(err_msg.str(),
511  OOMPH_CURRENT_FUNCTION,
512  OOMPH_EXCEPTION_LOCATION);
513  }
514 #endif
515 
516  // Clear and reserve space.
517  Doftype_coarsen_map_fine.clear();
518  Doftype_coarsen_map_coarse.clear();
519  Doftype_coarsen_map_fine.reserve(n_external_dof_types);
520  Doftype_coarsen_map_coarse.reserve(n_external_dof_types);
521 
522  // Now push back the identity mapping.
523  for (unsigned i = 0; i < n_external_dof_types; i++)
524  {
525  // Create a vector and push it in.
526  Vector<unsigned> tmp_vec(1,i);
527  Doftype_coarsen_map_fine.push_back(tmp_vec);
528  Doftype_coarsen_map_coarse.push_back(tmp_vec);
529  }
530  }
531  else
532  // Else this is a subsidiary block preconditioner.
533  {
534  // Both the Doftype_coarsen_map_fine and Doftype_coarsen_map_coarse
535  // vectors must be already be handled by the
536  // turn_into_subsidiary_block_preconditioner(...) function. We check this.
537 #ifdef PARANOID
538  if( (Doftype_coarsen_map_fine.size() == 0)
539  ||(Doftype_coarsen_map_coarse.size() == 0))
540  {
541  std::ostringstream err_msg;
542  err_msg
543  << "Either the Doftype_coarsen_map_fine or the \n"
544  << "Doftype_coarsen_map_coarse vectors is of size 0.\n"
545  << "Did you remember to call the function "
546  << "turn_into_subsidiary_block_preconditioner(...)?";
547  throw OomphLibWarning(err_msg.str(),
548  OOMPH_CURRENT_FUNCTION,
549  OOMPH_EXCEPTION_LOCATION);
550  }
551 #endif
552  }
553 
554 
555  // Now we create the vector Block_to_dof_map_coarse.
556  // Recall that the vector describe which dof types are in which block with
557  // the relationship:
558  //
559  // Block_to_dof_map_coarse[block_number] = Vector[dof types];
560  //
561  // Note that this is not the internal (underlying) dof type.
562  // Nor is this in relation to the parent block preconditioner's dof type.
563  // The number of elements in it is the same as dof_to_block_map vector.
564  //
565  // Since the dof type coarsening feature is added later, we encapsulate this
566  // bit of the code so it does not affect things below.
567  {
568  // Check that the dof_to_block map "makes sense" for the
569  // Doftype_coarsen_map_coarse.
570  // The Doftype_coarsen_map_coarse describes which doftypes should be
571  // considered as a single doftype in THIS preconditioner.
572  //
573  // For example, if this preconditioner is the LSC block preconditioner
574  // applied to a 3D problem, it expects 4 doftypes:
575  // 3 velocity, [u, v, w] and 1 pressure [p],
576  // giving us the dof type ordering
577  // [u v w p].
578  //
579  // The LSC preconditioner groups the velocity and pressure doftypes
580  // separately, thus the dof_to_block_map will be:
581  // [0 0 0 1]
582  //
583  // Then the Doftype_coarsen_map_coarse MUST have length 4, to identify
584  // which of the OTHER (possibly coarsened) dof types should be grouped
585  // together to be considered as a single dof types of THIS preconditioner.
586  //
587  // For example, if the preconditioner above this one has the dof type
588  // ordering:
589  // 0 1 2 3 4 5 6 7 8 9
590  // ub vb wb up vp wp ut vt wt p
591  // Then we want to tell THIS preconditioner which dof types belongs to
592  // u, v, w and p, by providing the optional argument
593  // Doftype_coarsen_map_coarse to the
594  // turn_into_subsidiary_block_preconditioner(...) function
595  // [[0 3 6] [1 4 7] [2 5 8] [9]]
596  //
597  // So, it is important that the length of dof_to_block_map is the same as
598  // the length of Doftype_coarsen_map_coarse. We check this.
599  unsigned dof_to_block_map_size = dof_to_block_map.size();
600 
601 #ifdef PARANOID
602  if(dof_to_block_map_size != Doftype_coarsen_map_coarse.size())
603  {
604  std::ostringstream err_msg;
605  err_msg
606  << "The size of dof_to_block_map and Doftype_coarsen_map_coarse is not "
607  << "the same.\n"
608  << "dof_to_block_map.size() = " << dof_to_block_map_size << "\n"
609  << "Doftype_coarsen_map_coarse.size() = "
610  << Doftype_coarsen_map_coarse.size() << ".\n"
611  << "One of the two list is incorrect, please look at the comments\n"
612  << "in the source code for more details.";
613  throw OomphLibWarning(err_msg.str(),
614  OOMPH_CURRENT_FUNCTION,
615  OOMPH_EXCEPTION_LOCATION);
616  }
617 #endif
618 
619  // Create the Block_to_dof_map_coarse from
620  // the dof_to_block_map and Doftype_coarsen_map_coarse.
621 
622  // find the maximum block number
623  unsigned max_block_number = *std::max_element(dof_to_block_map.begin(),
624  dof_to_block_map.end());
625 
626  // Now we do the following:
627  // Lets say the Doftype_coarsen_map_coarse is:
628  // [0 3 6]
629  // [1 4 7]
630  // [2 5 8]
631  // [9]
632  //
633  // (this is the same as the above example)
634  //
635  // and the dof_to_block_map is [0 0 0 1].
636  //
637  // Then we need to form the Block_to_dof_map_coarse:
638  // [0 3 6 1 4 7 2 5 8]
639  // [9]
640 
641  // Clear anything in the Block_to_dof_map_coarse
642  Block_to_dof_map_coarse.clear();
643 
644  const unsigned tmp_nblock = max_block_number + 1;
645 
646  Block_to_dof_map_coarse.resize(tmp_nblock);
647 
648  for (unsigned i = 0; i < dof_to_block_map_size; i++)
649  {
650  Block_to_dof_map_coarse[dof_to_block_map[i]].push_back(i);
651  }
652 
653  Block_to_dof_map_fine.clear();
654  Block_to_dof_map_fine.resize(tmp_nblock);
655  for (unsigned block_i = 0; block_i < tmp_nblock; block_i++)
656  {
657  // get the dof types in this block.
658  const unsigned ndof_in_block = Block_to_dof_map_coarse[block_i].size();
659  for (unsigned dof_i = 0; dof_i < ndof_in_block; dof_i++)
660  {
661  const unsigned coarsened_dof_i=Block_to_dof_map_coarse[block_i][dof_i];
662 
663  // Insert the most fine grain dofs which this dof_i corresponds to
664  // into block_i
665  Vector<unsigned> dof_i_dofs
666  =Doftype_coarsen_map_fine[coarsened_dof_i];
667 
668  Block_to_dof_map_fine[block_i].insert(
669  Block_to_dof_map_fine[block_i].end(),
670  dof_i_dofs.begin(),
671  dof_i_dofs.end());
672  }
673  }
674 
675  // Now set the dof_to_block_map to the identify.
676  // NOTE: We are now using the internal n dof types. This is because the
677  // dof type coarsening feature was built on top of the existing block
678  // preconditioning framework which does not handle coarsening of dof types.
679  // Hence, under the hood, it still works with the most fine grain dof types
680  // and does not do any coarsening.
681 
682  // Locally cache the internal ndof types (using access function because
683  // the Internal_ndof_type variable may not be set up yet if this is a
684  // master preconditioner).
685  unsigned tmp_internal_ndof_types = internal_ndof_types();
686 
687  dof_to_block_map.resize(tmp_internal_ndof_types,0);
688 
689  for (unsigned i = 0; i < tmp_internal_ndof_types; i++)
690  {
691  dof_to_block_map[i] = i;
692  }
693  } // end of Block_to_dof_map_coarse encapsulation
694 
695 #ifdef PARANOID
696 
697  // Check that the meshes are ok. This only needs to be done in the master
698  // because subsidiary preconditioners don't do anything with the meshes
699  // here.
700  if(is_master_block_preconditioner())
701  {
702  // This is declared as local_nmesh because there are other variables
703  // called nmesh floating about... but this will not exist if PARANOID is
704  // switched on.
705  unsigned local_nmesh = nmesh();
706 
707  // Check that some mesh pointers have been assigned.
708  if(local_nmesh == 0)
709  {
710  std::ostringstream error_msg;
711  error_msg << "Cannot setup blocks because no meshes have been set.";
712  throw OomphLibError(error_msg.str(),
713  OOMPH_CURRENT_FUNCTION,
714  OOMPH_EXCEPTION_LOCATION);
715  }
716 
717  // Each mesh must contain elements with the same number of dof.
718  // A stricter check is to ensure that the mesh contains only one type of
719  // elements. This is relaxed in same cases.
720  for (unsigned mesh_i = 0; mesh_i < local_nmesh; mesh_i++)
721  {
722  // The number of elements in the current mesh.
723  unsigned n_element = mesh_pt(mesh_i)->nelement();
724 
725  // When the bulk mesh is distributed, there may not be any elements
726  // in the surface mesh(es).
727  if(n_element > 0)
728  {
729  // The string of the first element in the current mesh.
730  std::string first_element_string
731  =typeid(*(mesh_pt(mesh_i)->element_pt(0))).name();
732 
733  // If there are multiple element types in the current mesh,
734  // we can at least make sure that they contain the same types of DOFs.
735  if(bool(Allow_multiple_element_type_in_mesh[mesh_i]))
736  {
737  // The ndof types of the first element.
738  unsigned first_element_ndof_type =
739  mesh_pt(mesh_i)->element_pt(0)->ndof_types();
740 
741  // Loop through the meshes and compare the number of types of DOFs.
742  for (unsigned el_i = 1; el_i < n_element; el_i++)
743  {
744  // The ndof type of the current element.
745  unsigned current_element_ndof_type =
746  mesh_pt(mesh_i)->element_pt(el_i)->ndof_types();
747 
748  // The string of the current element.
749  std::string current_element_string
750  =typeid(*(mesh_pt(mesh_i)->element_pt(el_i))).name();
751 
752  // Compare against the first element.
753  if(current_element_ndof_type != first_element_ndof_type)
754  {
755  std::ostringstream error_message;
756  error_message
757  << "Elements in the same mesh MUST have the same number of types "
758  << "of DOFs.\n"
759  << "The element in mesh " << mesh_i << ", at position "
760  << el_i << " is: \n"
761  << current_element_string <<", \n"
762  << "with ndof types: " << current_element_ndof_type << ".\n"
763  << "The first element in the same mesh is: \n"
764  << first_element_string << ", \n"
765  << "with ndof types: " << first_element_ndof_type << ".\n";
766  throw OomphLibError(error_message.str(),
767  OOMPH_CURRENT_FUNCTION,
768  OOMPH_EXCEPTION_LOCATION);
769  }
770  }
771  }
772  else
773  // There should be only one type of elements in the current mesh. Check
774  // that this is the case!
775  {
776  // Loop through the elements in the current mesh.
777  for (unsigned el_i = 1; el_i < n_element; el_i++)
778  {
779  // The string of the current element.
780  std::string current_element_string
781  =typeid(*(mesh_pt(mesh_i)->element_pt(el_i))).name();
782 
783  // Compare against the first element.
784  if(current_element_string.compare(first_element_string) != 0)
785  {
786  std::ostringstream error_message;
787  error_message
788  << "By default, a mesh containing block preconditionable "
789  << "elements must contain only one type of element.\n"
790  << "The element in mesh " << mesh_i << ", at position "
791  << el_i << " is: \n" << current_element_string << "\n"
792  << "The first element in the same mesh is: \n"
793  << first_element_string << "\n"
794  << "If this is correct, consider calling the set_mesh(...) with\n"
795  << "the optional argument set true to allow multiple element\n"
796  << "types in the same mesh.\n"
797  << "Note: A minimal requirement is that the elements in the same\n"
798  << "mesh MUST have the same number of DOF types.";
799  throw OomphLibError(error_message.str(),
800  OOMPH_CURRENT_FUNCTION,
801  OOMPH_EXCEPTION_LOCATION);
802  }
803  }
804  }
805  }
806  }
807  }
808 
809 #endif
810  // clear the memory
811  this->clear_block_preconditioner_base();
812 
813  // get my_rank and nproc
814 #ifdef OOMPH_HAS_MPI
815  unsigned my_rank = comm_pt()->my_rank();
816  unsigned nproc = comm_pt()->nproc();
817 #endif
818 
819 
820  /////////////////////////////////////////////////////////////////////////////
821  // start of master block preconditioner only operations
822  /////////////////////////////////////////////////////////////////////////////
823 #ifdef OOMPH_HAS_MPI
824  unsigned* nreq_sparse = new unsigned[nproc]();
825  unsigned* nreq_sparse_for_proc = new unsigned[nproc]();
826  unsigned** index_in_dof_block_sparse_send = new unsigned*[nproc]();
827  unsigned** dof_number_sparse_send = new unsigned*[nproc]();
828  Vector<MPI_Request> send_requests_sparse;
829  Vector<MPI_Request> recv_requests_sparse;
830 #endif
831 
832  // If this preconditioner is the master preconditioner then we need
833  // to assemble the vectors : Dof_number
834  // Index_in_dof_block
835  if (is_master_block_preconditioner())
836  {
837  // Get the number of dof types in each mesh.
838  Ndof_types_in_mesh.assign(nmesh(),0);
839  for(unsigned i=0; i<nmesh(); i++)
840  {
841  Ndof_types_in_mesh[i] = mesh_pt(i)->ndof_types();
842  }
843  // Setup the distribution of this preconditioner, assumed to be the same
844  // as the matrix if the matrix is distributable.
845  if (dynamic_cast<DistributableLinearAlgebraObject*>(matrix_pt()))
846  {
847  this->build_distribution
848  (dynamic_cast<DistributableLinearAlgebraObject*>
849  (matrix_pt())->distribution_pt());
850  }
851  else
852  {
853  LinearAlgebraDistribution dist(comm_pt(),
854  matrix_pt()->nrow(),false);
855  this->build_distribution(dist);
856  }
857  Nrow = matrix_pt()->nrow();
858 
859  // Boolean to indicate whether the matrix is actually distributed,
860  // ie distributed and on more than one processor.
861  bool matrix_distributed =
862  (this->distribution_pt()->distributed() &&
863  this->distribution_pt()->communicator_pt()->nproc() > 1);
864 
865 
866  // Matrix must be a CR matrix.
867  CRDoubleMatrix* cr_matrix_pt
868  = dynamic_cast<CRDoubleMatrix*>(matrix_pt());
869 
870  if (cr_matrix_pt == 0)
871  {
872  std::ostringstream error_message;
873  error_message << "Block setup for distributed matrices only works "
874  << "for CRDoubleMatrices";
875  throw OomphLibError(error_message.str(),
876  OOMPH_CURRENT_FUNCTION,
877  OOMPH_EXCEPTION_LOCATION);
878  }
879 
880 
881 
882 
883  // Get distribution.
884  unsigned first_row = this->distribution_pt()->first_row();
885  unsigned nrow_local = this->distribution_pt()->nrow_local();
886  unsigned last_row = first_row+nrow_local-1;
887 
888 #ifdef OOMPH_HAS_MPI
889  // storage for the rows required by each processor in the dense
890  // block lookup storage scheme
891  // dense_required_rows(p,0) is the minimum global index required by proc p
892  // ...(p,1) is the maximum global index required by proc p
893  DenseMatrix<unsigned> dense_required_rows(nproc,2);
894  for (unsigned p = 0; p < nproc; p++)
895  {
896  dense_required_rows(p,0) = this->distribution_pt()->first_row(p);
897  dense_required_rows(p,1) = this->distribution_pt()->first_row(p)
898  +this->distribution_pt()->nrow_local(p) - 1;
899  }
900 
901  // determine the global rows That are not in the range first_row to
902  // first_row+nrow_local for which we should store the
903  // Dof_index and Index_in_dof_block for
904  // then send the lists to other processors
905  std::set<unsigned> sparse_global_rows_for_block_lookup;
906  if (matrix_distributed)
907  {
908  unsigned nnz = cr_matrix_pt->nnz();
909  int* column_index = cr_matrix_pt->column_index();
910  for (unsigned i = 0; i < nnz; i++)
911  {
912  unsigned ci = column_index[i];
913  if (ci<first_row || ci>last_row)
914  {
915  sparse_global_rows_for_block_lookup.insert(ci);
916  }
917  }
918  }
919 
920  int nsparse = sparse_global_rows_for_block_lookup.size();
921 
922  Global_index_sparse.resize(0);
923  std::copy(sparse_global_rows_for_block_lookup.begin(),
924  sparse_global_rows_for_block_lookup.end(),
925  std::back_inserter(Global_index_sparse));
926 
927  Index_in_dof_block_sparse.resize(nsparse);
928  Dof_number_sparse.resize(nsparse);
929  sparse_global_rows_for_block_lookup.clear();
930 
931  Vector<MPI_Request> recv_requests_sparse_nreq;
932  if (matrix_distributed)
933  {
934  MPI_Aint base_displacement_sparse;
935  MPI_Address(nreq_sparse,&base_displacement_sparse);
936 
937  int zero = 0;
938  for (unsigned p = 0; p < nproc; p++)
939  {
940  // determine the global eqn numbers required by this processor
941  // that can be classified by processor p
942  int begin = 0;
943  for (int i = 0; i < nsparse; ++i)
944  {
945  if (Global_index_sparse[i]<dense_required_rows(p,0))
946  {
947  ++begin;
948  }
949  else
950  {
951  if (Global_index_sparse[i]<=dense_required_rows(p,1))
952  {
953  ++nreq_sparse[p];
954  }
955  else
956  {
957  break;
958  }
959  }
960  }
961 
962  // if this processor has rows to be classified by proc p
963  if (nreq_sparse[p]>0)
964  {
965 
966  // send the number of global eqn numbers
967  MPI_Request req1;
968  MPI_Isend(&nreq_sparse[p],1,MPI_UNSIGNED,p,31,
969  comm_pt()->mpi_comm(),&req1);
970  send_requests_sparse.push_back(req1);
971 
972  // send the global eqn numbers
973  MPI_Request req2;
974  MPI_Isend(&Global_index_sparse[begin],
975  nreq_sparse[p],MPI_UNSIGNED,p,32,
976  comm_pt()->mpi_comm(),&req2);
977  send_requests_sparse.push_back(req2);
978 
979  // post the recvs for the data that will be returned
980 
981  // the datatypes, displacements, lengths for the two datatypes
982  MPI_Datatype types[2];
983  MPI_Aint displacements[2];
984  int lengths[2];
985 
986  // index in dof block
987  MPI_Type_contiguous(nreq_sparse[p],MPI_UNSIGNED,&types[0]);
988  MPI_Type_commit(&types[0]);
989  MPI_Address(&Index_in_dof_block_sparse[begin],&displacements[0]);
990  displacements[0] -= base_displacement_sparse;
991  lengths[0] = 1;
992 
993  // dof number
994  MPI_Type_contiguous(nreq_sparse[p],MPI_UNSIGNED,&types[1]);
995  MPI_Type_commit(&types[1]);
996  MPI_Address(&Dof_number_sparse[begin],&displacements[1]);
997  displacements[1] -= base_displacement_sparse;
998  lengths[1] = 1;
999 
1000  // build the final type
1001  MPI_Datatype recv_type;
1002  MPI_Type_struct(2,lengths,displacements,types,&recv_type);
1003  MPI_Type_commit(&recv_type);
1004  MPI_Type_free(&types[0]);
1005  MPI_Type_free(&types[1]);
1006 
1007  // and recv
1008  MPI_Request req;
1009  MPI_Irecv(nreq_sparse,1,recv_type,p,33,
1010  comm_pt()->mpi_comm(),&req);
1011  recv_requests_sparse.push_back(req);
1012  MPI_Type_free(&recv_type);
1013  }
1014 
1015  // if no communication required, confirm this
1016  if (nreq_sparse[p]==0)
1017  {
1018  MPI_Request req1;
1019  MPI_Isend(&zero,1,MPI_UNSIGNED,p,31,
1020  comm_pt()->mpi_comm(),&req1);
1021  send_requests_sparse.push_back(req1);
1022  }
1023 
1024  //
1025  MPI_Request req;
1026  MPI_Irecv(&nreq_sparse_for_proc[p],1,MPI_UNSIGNED,p,31,
1027  comm_pt()->mpi_comm(),&req);
1028  recv_requests_sparse_nreq.push_back(req);
1029  }
1030  }
1031 #endif
1032 
1033  // resize the storage
1034  Dof_number_dense.resize(nrow_local);
1035  Index_in_dof_block_dense.resize(nrow_local);
1036 
1037  // zero the number of dof types
1038  Internal_ndof_types = 0;
1039 
1040 #ifdef PARANOID
1041  // Vector to keep track of previously assigned block numbers
1042  // to check consistency between multiple assignments.
1043  Vector<int> previously_assigned_block_number(nrow_local,
1045 #endif
1046 
1047  // determine whether the problem is distribution
1048  bool problem_distributed = false;
1049 
1050  // the problem method distributed() is only accessible with MPI
1051 #ifdef OOMPH_HAS_MPI
1052  problem_distributed = any_mesh_distributed();
1053 #endif
1054 
1055  // if the problem is not distributed
1056  if (!problem_distributed)
1057  {
1058  // Offset for the block type in the overall system.
1059  // Different meshes contain different block-preconditionable
1060  // elements -- their blocks are added one after the other.
1061  unsigned dof_offset=0;
1062 
1063  // Loop over all meshes.
1064  for (unsigned m=0;m<nmesh();m++)
1065  {
1066  // Number of elements in this mesh.
1067  unsigned n_element = mesh_pt(m)->nelement();
1068 
1069  // Find the number of block types that the elements in this mesh
1070  // are in charge of.
1071  unsigned ndof_in_element = ndof_types_in_mesh(m);
1072  Internal_ndof_types += ndof_in_element;
1073 
1074  for (unsigned e=0;e<n_element;e++)
1075  {
1076  // List containing pairs of global equation number and
1077  // dof number for each global dof in an element.
1078  std::list<std::pair<unsigned long,unsigned> > dof_lookup_list;
1079 
1080  // Get list of blocks associated with the element's global unknowns.
1081  mesh_pt(m)->element_pt(e)->
1082  get_dof_numbers_for_unknowns(dof_lookup_list);
1083 
1084  // Loop over all entries in the list
1085  // and store the block number.
1086  typedef std::list<std::pair<unsigned long,unsigned> >::iterator IT;
1087  for (IT it=dof_lookup_list.begin();
1088  it!=dof_lookup_list.end();it++)
1089  {
1090  unsigned long global_dof = it->first;
1091  if (global_dof >= unsigned(first_row) &&
1092  global_dof <= unsigned(last_row))
1093  {
1094  unsigned dof_number = (it->second)+dof_offset;
1095  Dof_number_dense[global_dof-first_row]=dof_number;
1096 
1097 #ifdef PARANOID
1098  // Check consistency of block numbers if assigned multiple times
1099  if (previously_assigned_block_number[global_dof-
1100  first_row]<0)
1101  {
1102  previously_assigned_block_number[global_dof-first_row]
1103  =dof_number;
1104  }
1105 #endif
1106  }
1107  }
1108  }
1109 
1110  // About to do the next mesh which contains block preconditionable
1111  // elements of a different type; all the dofs that these elements are
1112  // "in charge of" differ from the ones considered so far.
1113  // Bump up the block counter to make sure we're not overwriting
1114  // anything here
1115  dof_offset+=ndof_in_element;
1116  }
1117 
1118 #ifdef PARANOID
1119  // check that every global equation number has been allocated a dof type
1120  for (unsigned i = 0; i < nrow_local; i++)
1121  {
1122  if (previously_assigned_block_number[i] < 0)
1123  {
1124  std::ostringstream error_message;
1125  error_message << "Not all degrees of freedom have had DOF type "
1126  << "numbers allocated. Dof number " << i
1127  << " is unallocated.";
1128  throw OomphLibError(error_message.str(),
1129  OOMPH_CURRENT_FUNCTION,
1130  OOMPH_EXCEPTION_LOCATION);
1131  }
1132  }
1133 #endif
1134  }
1135  // else the problem is distributed
1136  else
1137  {
1138 #ifdef OOMPH_HAS_MPI
1139  // Offset for the block type in the overall system.
1140  // Different meshes contain different block-preconditionable
1141  // elements -- their blocks are added one after the other...
1142  unsigned dof_offset=0;
1143 
1144  // the set of global degrees of freedom and their corresponding dof
1145  // number on this processor
1146  std::map<unsigned long,unsigned > my_dof_map;
1147 
1148  // Loop over all meshes
1149  for (unsigned m=0;m<nmesh();m++)
1150  {
1151  // Number of elements in this mesh
1152  unsigned n_element = this->mesh_pt(m)->nelement();
1153 
1154  // Find the number of block types that the elements in this mesh
1155  // are in charge of
1156  unsigned ndof_in_element = ndof_types_in_mesh(m);
1157  Internal_ndof_types += ndof_in_element;
1158 
1159  // Loop over all elements
1160  for (unsigned e=0;e<n_element;e++)
1161  {
1162 
1163  // if the element is not a halo element
1164  if (!this->mesh_pt(m)->element_pt(e)->is_halo())
1165  {
1166  // List containing pairs of global equation number and
1167  // dof number for each global dof in an element
1168  std::list<std::pair<unsigned long,unsigned> > dof_lookup_list;
1169 
1170  // Get list of blocks associated with the element's global
1171  // unknowns
1172  this->mesh_pt(m)->element_pt(e)->
1173  get_dof_numbers_for_unknowns(dof_lookup_list);
1174 
1175  // update the block numbers and put it in the map.
1176  typedef
1177  std::list<std::pair<unsigned long,unsigned> >::iterator IT;
1178  for (IT it=dof_lookup_list.begin();
1179  it!=dof_lookup_list.end();it++)
1180  {
1181  it->second = (it->second)+dof_offset;
1182  my_dof_map[it->first] = it->second;
1183  }
1184  }
1185  }
1186 
1187  // About to do the next mesh which contains block preconditionable
1188  // elements of a different type; all the dofs that these elements are
1189  // "in charge of" differ from the ones considered so far.
1190  // Bump up the block counter to make sure we're not overwriting
1191  // anything here
1192  dof_offset+=ndof_in_element;
1193  }
1194 
1195  // next copy the map of my dofs to two vectors to send
1196  unsigned my_ndof = my_dof_map.size();
1197  unsigned long* my_global_dofs = new unsigned long[my_ndof];
1198  unsigned* my_dof_numbers = new unsigned[my_ndof];
1199  typedef
1200  std::map<unsigned long,unsigned >::iterator IT;
1201  unsigned pt = 0;
1202  for (IT it = my_dof_map.begin(); it != my_dof_map.end(); it++)
1203  {
1204  my_global_dofs[pt] = it->first;
1205  my_dof_numbers[pt] = it->second;
1206  pt++;
1207  }
1208 
1209  // and then clear the map
1210  my_dof_map.clear();
1211 
1212  // count up how many DOFs need to be sent to each processor
1213  int* first_dof_to_send = new int[nproc];
1214  int* ndof_to_send = new int[nproc];
1215  unsigned ptr = 0;
1216  for (unsigned p = 0; p < nproc; p++)
1217  {
1218  first_dof_to_send[p] = 0;
1219  ndof_to_send[p] = 0;
1220  while (ptr < my_ndof && my_global_dofs[ptr] < dense_required_rows(p,0))
1221  {
1222  ptr++;
1223  }
1224  first_dof_to_send[p] = ptr;
1225  while (ptr < my_ndof && my_global_dofs[ptr] <= dense_required_rows(p,1))
1226  {
1227  ndof_to_send[p]++;
1228  ptr++;
1229  }
1230  }
1231 
1232  // next communicate to each processor how many dofs it expects to recv
1233  int* ndof_to_recv = new int[nproc];
1234  MPI_Alltoall(ndof_to_send,1,MPI_INT,ndof_to_recv,1,MPI_INT,
1235  comm_pt()->mpi_comm());
1236 
1237  // the base displacements for the sends
1238  MPI_Aint base_displacement;
1239  MPI_Address(my_global_dofs,&base_displacement);
1240 
1241 #ifdef PARANOID
1242  // storage for paranoid check to ensure that every row as been
1243  // imported
1244  std::vector<bool> dof_recv(nrow_local,false);
1245 #endif
1246 
1247  // next send and recv
1248  Vector<MPI_Request> send_requests;
1249  Vector<MPI_Request> recv_requests;
1250  Vector<unsigned long*> global_dofs_recv(nproc,0);
1251  Vector<unsigned*> dof_numbers_recv(nproc,0);
1252  Vector<unsigned> proc;
1253  for (unsigned p = 0; p < nproc; p++)
1254  {
1255  if (p != my_rank)
1256  {
1257 
1258  // send
1259  if (ndof_to_send[p] > 0)
1260  {
1261  // the datatypes, displacements, lengths for the two datatypes
1262  MPI_Datatype types[2];
1263  MPI_Aint displacements[2];
1264  int lengths[2];
1265 
1266  // my global dofs
1267  MPI_Type_contiguous(ndof_to_send[p],MPI_UNSIGNED_LONG,&types[0]);
1268  MPI_Type_commit(&types[0]);
1269  MPI_Address(my_global_dofs + first_dof_to_send[p],
1270  &displacements[0]);
1271  displacements[0] -= base_displacement;
1272  lengths[0] = 1;
1273 
1274  // my dof numbers
1275  MPI_Type_contiguous(ndof_to_send[p],MPI_UNSIGNED,&types[1]);
1276  MPI_Type_commit(&types[1]);
1277  MPI_Address(my_dof_numbers + first_dof_to_send[p],
1278  &displacements[1]);
1279  displacements[1] -= base_displacement;
1280  lengths[1] = 1;
1281 
1282  // build the final type
1283  MPI_Datatype send_type;
1284  MPI_Type_struct(2,lengths,displacements,types,&send_type);
1285  MPI_Type_commit(&send_type);
1286  MPI_Type_free(&types[0]);
1287  MPI_Type_free(&types[1]);
1288 
1289  // and send
1290  MPI_Request req;
1291  MPI_Isend(my_global_dofs,1,send_type,p,2,
1292  comm_pt()->mpi_comm(),&req);
1293  send_requests.push_back(req);
1294  MPI_Type_free(&send_type);
1295  }
1296 
1297  // and recv
1298  if (ndof_to_recv[p] > 0)
1299  {
1300  // resize the storage
1301  global_dofs_recv[p] = new unsigned long[ndof_to_recv[p]];
1302  dof_numbers_recv[p] = new unsigned[ndof_to_recv[p]];
1303  proc.push_back(p);
1304 
1305  // the datatypes, displacements, lengths for the two datatypes
1306  MPI_Datatype types[2];
1307  MPI_Aint displacements[2];
1308  int lengths[2];
1309 
1310  // my global dofs
1311  MPI_Type_contiguous(ndof_to_recv[p],MPI_UNSIGNED_LONG,&types[0]);
1312  MPI_Type_commit(&types[0]);
1313  MPI_Address(global_dofs_recv[p],&displacements[0]);
1314  displacements[0] -= base_displacement;
1315  lengths[0] = 1;
1316 
1317  // my dof numbers
1318  MPI_Type_contiguous(ndof_to_recv[p],MPI_UNSIGNED,&types[1]);
1319  MPI_Type_commit(&types[1]);
1320  MPI_Address(dof_numbers_recv[p],&displacements[1]);
1321  displacements[1] -= base_displacement;
1322  lengths[1] = 1;
1323 
1324  // build the final type
1325  MPI_Datatype recv_type;
1326  MPI_Type_struct(2,lengths,displacements,types,&recv_type);
1327  MPI_Type_commit(&recv_type);
1328  MPI_Type_free(&types[0]);
1329  MPI_Type_free(&types[1]);
1330 
1331  // and recv
1332  MPI_Request req;
1333  MPI_Irecv(my_global_dofs,1,recv_type,p,2,
1334  comm_pt()->mpi_comm(),&req);
1335  recv_requests.push_back(req);
1336  MPI_Type_free(&recv_type);
1337  }
1338 
1339  }
1340  // send to self
1341  else
1342  {
1343  unsigned u = first_dof_to_send[p] + ndof_to_recv[p];
1344  for (unsigned i = first_dof_to_send[p]; i < u; i++)
1345  {
1346 #ifdef PARANOID
1347  // indicate that this dof has ben recv
1348  dof_recv[my_global_dofs[i]-first_row] = true;
1349 #endif
1350  Dof_number_dense[my_global_dofs[i]-first_row] =
1351  my_dof_numbers[i];
1352  }
1353  }
1354  }
1355 
1356  // recv and store the data
1357  unsigned c_recv = recv_requests.size();
1358  while (c_recv > 0)
1359  {
1360 
1361  // wait for any communication to finish
1362  int req_number;
1363  MPI_Waitany(c_recv,&recv_requests[0],&req_number,MPI_STATUS_IGNORE);
1364  recv_requests.erase(recv_requests.begin()+req_number);
1365  c_recv--;
1366 
1367  // determine the source processor
1368  unsigned p = proc[req_number];
1369  proc.erase(proc.begin()+req_number);
1370 
1371  // import the data
1372  for (int i = 0; i < ndof_to_recv[p]; i++)
1373  {
1374 #ifdef PARANOID
1375  // indicate that this dof has ben recv
1376  dof_recv[global_dofs_recv[p][i]-first_row] = true;
1377 #endif
1378  Dof_number_dense[global_dofs_recv[p][i]-first_row]
1379  = dof_numbers_recv[p][i];
1380  }
1381 
1382  // delete the data
1383  delete[] global_dofs_recv[p];
1384  delete[] dof_numbers_recv[p];
1385  }
1386 
1387  // finally wait for the send requests to complete as we are leaving
1388  // an MPI block of code
1389  unsigned csr = send_requests.size();
1390  if (csr)
1391  {
1392  MPI_Waitall(csr,&send_requests[0],MPI_STATUS_IGNORE);
1393  }
1394 
1395  // clean up
1396  delete[] ndof_to_send;
1397  delete[] first_dof_to_send;
1398  delete[] ndof_to_recv;
1399  delete[] my_global_dofs;
1400  delete[] my_dof_numbers;
1401 #ifdef PARANOID
1402  unsigned all_recv = true;
1403  for (unsigned i = 0; i < nrow_local; i++)
1404  {
1405  if (!dof_recv[i])
1406  {
1407  all_recv = false;
1408  }
1409  }
1410  if (!all_recv)
1411  {
1412  std::ostringstream error_message;
1413  error_message << "Not all the DOF numbers required were received";
1414  throw OomphLibError(error_message.str(),
1415  OOMPH_CURRENT_FUNCTION,
1416  OOMPH_EXCEPTION_LOCATION);
1417  }
1418 #endif
1419 #else
1420  std::ostringstream error_message;
1421  error_message
1422  << "The problem appears to be distributed, MPI is required";
1423  throw OomphLibError(error_message.str(),
1424  OOMPH_CURRENT_FUNCTION,
1425  OOMPH_EXCEPTION_LOCATION);
1426 #endif
1427  }
1428 #ifdef OOMPH_HAS_MPI
1429  Vector<unsigned*> sparse_rows_for_proc(nproc,0);
1430  Vector<MPI_Request> sparse_rows_for_proc_requests;
1431  if (matrix_distributed)
1432  {
1433  // wait for number of sparse rows each processor requires
1434  // post recvs for that data
1435  if (recv_requests_sparse_nreq.size()>0)
1436  {
1437  MPI_Waitall(recv_requests_sparse_nreq.size(),
1438  &recv_requests_sparse_nreq[0],
1439  MPI_STATUS_IGNORE);
1440  }
1441  for (unsigned p = 0; p < nproc; ++p)
1442  {
1443  if (nreq_sparse_for_proc[p] > 0)
1444  {
1445  MPI_Request req;
1446  sparse_rows_for_proc[p] = new unsigned[nreq_sparse_for_proc[p]];
1447  MPI_Irecv(sparse_rows_for_proc[p],nreq_sparse_for_proc[p],
1448  MPI_UNSIGNED,p,32,
1449  comm_pt()->mpi_comm(),&req);
1450  sparse_rows_for_proc_requests.push_back(req);
1451  }
1452  }
1453  }
1454 #endif
1455 
1456 
1457  // for every global degree of freedom required by this processor we now
1458  // have the corresponding dof number
1459 
1460  // clear the Ndof_in_dof_block storage
1461  Dof_dimension.assign(Internal_ndof_types,0);
1462 
1463  // first consider a non distributed matrix
1464  if (!matrix_distributed)
1465  {
1466  // set the Index_in_dof_block
1467  unsigned nrow = this->distribution_pt()->nrow();
1468  Index_in_dof_block_dense.resize(nrow);
1469  Index_in_dof_block_dense.initialise(0);
1470  for (unsigned i = 0; i < nrow; i++)
1471  {
1472  Index_in_dof_block_dense[i] = Dof_dimension[Dof_number_dense[i]];
1473  Dof_dimension[Dof_number_dense[i]]++;
1474  }
1475  }
1476 
1477  // next a distributed matrix
1478  else
1479  {
1480 #ifdef OOMPH_HAS_MPI
1481 
1482 
1483  // first compute how many instances of each dof are on this
1484  // processor
1485  unsigned* my_nrows_in_dof_block = new unsigned[Internal_ndof_types];
1486  for (unsigned i = 0; i < Internal_ndof_types; i++)
1487  {
1488  my_nrows_in_dof_block[i] = 0;
1489  }
1490  for (unsigned i = 0; i < nrow_local; i++)
1491  {
1492  my_nrows_in_dof_block[Dof_number_dense[i]]++;
1493  }
1494 
1495  // next share the data
1496  unsigned* nrow_in_dof_block_recv = new unsigned[Internal_ndof_types*nproc];
1497  MPI_Allgather(my_nrows_in_dof_block,Internal_ndof_types,MPI_UNSIGNED,
1498  nrow_in_dof_block_recv,Internal_ndof_types,MPI_UNSIGNED,
1499  comm_pt()->mpi_comm());
1500  delete[] my_nrows_in_dof_block;
1501 
1502  // compute my first dof index and Nrows_in_dof_block
1503  Vector<unsigned> my_first_dof_index(Internal_ndof_types,0);
1504  for (unsigned i = 0; i < Internal_ndof_types; i++)
1505  {
1506  for (unsigned p = 0; p < my_rank; p++)
1507  {
1508  my_first_dof_index[i] += nrow_in_dof_block_recv[p*Internal_ndof_types + i];
1509  }
1510  Dof_dimension[i] = my_first_dof_index[i];
1511  for (unsigned p = my_rank; p < nproc; p++)
1512  {
1513  Dof_dimension[i] += nrow_in_dof_block_recv[p*Internal_ndof_types + i];
1514  }
1515  }
1516  delete[] nrow_in_dof_block_recv;
1517 
1518  // next compute Index in dof block
1519  Index_in_dof_block_dense.resize(nrow_local);
1520  Index_in_dof_block_dense.initialise(0);
1521  Vector<unsigned> dof_counter(Internal_ndof_types,0);
1522  for (unsigned i = 0; i < nrow_local; i++)
1523  {
1524  Index_in_dof_block_dense[i] =
1525  my_first_dof_index[Dof_number_dense[i]] +
1526  dof_counter[Dof_number_dense[i]];
1527  dof_counter[Dof_number_dense[i]]++;
1528  }
1529 
1530  // the base displacements for the sends
1531  if (sparse_rows_for_proc_requests.size()>0)
1532  {
1533  MPI_Waitall(sparse_rows_for_proc_requests.size(),
1534  &sparse_rows_for_proc_requests[0],
1535  MPI_STATUS_IGNORE);
1536  }
1537  MPI_Aint base_displacement;
1538  MPI_Address(dof_number_sparse_send,&base_displacement);
1539  unsigned first_row = this->distribution_pt()->first_row();
1540  for (unsigned p = 0; p < nproc; ++p)
1541  {
1542  if (nreq_sparse_for_proc[p]>0)
1543  {
1544  // construct the data
1545  index_in_dof_block_sparse_send[p] =
1546  new unsigned[nreq_sparse_for_proc[p]];
1547  dof_number_sparse_send[p] =
1548  new unsigned[nreq_sparse_for_proc[p]];
1549  for (unsigned i = 0; i < nreq_sparse_for_proc[p]; ++i)
1550  {
1551  unsigned r = sparse_rows_for_proc[p][i];
1552  r -= first_row;
1553  index_in_dof_block_sparse_send[p][i]
1554  = Index_in_dof_block_dense[r];
1555  dof_number_sparse_send[p][i]
1556  = Dof_number_dense[r];
1557  }
1558  delete[] sparse_rows_for_proc[p];
1559 
1560  // send the data
1561  // the datatypes, displacements, lengths for the two datatypes
1562  MPI_Datatype types[2];
1563  MPI_Aint displacements[2];
1564  int lengths[2];
1565 
1566  // index in dof block
1567  MPI_Type_contiguous(nreq_sparse_for_proc[p],MPI_UNSIGNED,&types[0]);
1568  MPI_Type_commit(&types[0]);
1569  MPI_Address(index_in_dof_block_sparse_send[p],&displacements[0]);
1570  displacements[0] -= base_displacement;
1571  lengths[0] = 1;
1572 
1573  // dof number
1574  MPI_Type_contiguous(nreq_sparse_for_proc[p],MPI_UNSIGNED,&types[1]);
1575  MPI_Type_commit(&types[1]);
1576  MPI_Address(dof_number_sparse_send[p],&displacements[1]);
1577  displacements[1] -= base_displacement;
1578  lengths[1] = 1;
1579 
1580  // build the final type
1581  MPI_Datatype send_type;
1582  MPI_Type_struct(2,lengths,displacements,types,&send_type);
1583  MPI_Type_commit(&send_type);
1584  MPI_Type_free(&types[0]);
1585  MPI_Type_free(&types[1]);
1586 
1587  // and recv
1588  MPI_Request req;
1589  MPI_Isend(dof_number_sparse_send,1,send_type,p,33,
1590  comm_pt()->mpi_comm(),&req);
1591  send_requests_sparse.push_back(req);
1592  MPI_Type_free(&send_type);
1593  }
1594  else
1595  {
1596  index_in_dof_block_sparse_send[p] = 0;
1597  dof_number_sparse_send[p] = 0;
1598  }
1599  }
1600 #endif
1601  }
1602  }
1603 
1604  /////////////////////////////////////////////////////////////////////////////
1605  // end of master block preconditioner only operations
1606  /////////////////////////////////////////////////////////////////////////////
1607 
1608  // compute the number of rows in each block
1609 
1610 #ifdef PARANOID
1611  //check the vector is the correct length
1612  if (dof_to_block_map.size() != Internal_ndof_types)
1613  {
1614  std::ostringstream error_message;
1615  error_message
1616  << "The dof_to_block_map vector (size="
1617  << dof_to_block_map.size() << ") must be of size Internal_ndof_types="
1618  << Internal_ndof_types;
1619  throw OomphLibError(
1620  error_message.str(),
1621  OOMPH_CURRENT_FUNCTION,
1622  OOMPH_EXCEPTION_LOCATION);
1623  }
1624 #endif
1625 
1626  // find the maximum block number RAYAY use std::max_element
1627  unsigned max_block_number = 0;
1628  for (unsigned i = 0; i < Internal_ndof_types; i++)
1629  {
1630  if (dof_to_block_map[i] > max_block_number)
1631  {
1632  max_block_number = dof_to_block_map[i];
1633  }
1634  }
1635 
1636  // resize the storage the the block to dof map
1637  Block_number_to_dof_number_lookup.clear();
1638  Block_number_to_dof_number_lookup.resize(max_block_number+1);
1639  Ndof_in_block.clear();
1640  Ndof_in_block.resize(max_block_number+1);
1641 
1642  // resize storage
1643  Dof_number_to_block_number_lookup.resize(Internal_ndof_types);
1644 
1645  // build the storage for the two maps (block to dof) and (dof to block)
1646  for (unsigned i = 0; i < Internal_ndof_types; i++)
1647  {
1648  Dof_number_to_block_number_lookup[i] = dof_to_block_map[i];
1649  Block_number_to_dof_number_lookup[dof_to_block_map[i]].push_back(i);
1650  Ndof_in_block[dof_to_block_map[i]]++;
1651  }
1652 
1653 #ifdef PARANOID
1654  // paranoid check that every block number has at least one DOF associated
1655  // with it
1656  for (unsigned i = 0; i < max_block_number+1; i++)
1657  {
1658  if (Block_number_to_dof_number_lookup[i].size() == 0)
1659  {
1660  std::ostringstream error_message;
1661  error_message << "block number " << i
1662  << " does not have any DOFs associated with it";
1663  throw OomphLibWarning(
1664  error_message.str(),
1665  OOMPH_CURRENT_FUNCTION,
1666  OOMPH_EXCEPTION_LOCATION);
1667  }
1668  }
1669 #endif
1670 
1671  // Update the number of blocks types.
1672  Internal_nblock_types = max_block_number+1;
1673 
1674  // Distributed or not, depends on if we have more than one processor.
1675  bool distributed = this->master_distribution_pt()->distributed();
1676 
1677  // Create the new block distributions.
1678  Internal_block_distribution_pt.resize(Internal_nblock_types);
1679  for (unsigned i = 0; i < Internal_nblock_types; i++)
1680  {
1681  unsigned block_dim = 0;
1682  for (unsigned j = 0; j < Ndof_in_block[i]; j++)
1683  {
1684  block_dim +=
1685  internal_dof_block_dimension(Block_number_to_dof_number_lookup[i][j]);
1686  }
1687  Internal_block_distribution_pt[i] = new
1688  LinearAlgebraDistribution(comm_pt(),
1689  block_dim,distributed);
1690  }
1691 
1692  // Work out the distribution of the dof-level blocks.
1693  // Since several dof types may be coarsened into a single dof type.
1694  // We get the dof-level block distributions from the parent preconditioner.
1695 
1696  // How many dof types are there?
1697  if(is_subsidiary_block_preconditioner())
1698  {
1699  // Delete any pre-existing distributions.
1700  const unsigned dof_block_distribution_size
1701  = Dof_block_distribution_pt.size();
1702  for (unsigned dof_i = 0; dof_i < dof_block_distribution_size; dof_i++)
1703  {
1704  delete Dof_block_distribution_pt[dof_i];
1705  }
1706  const unsigned ndofs = this->ndof_types();
1707  Dof_block_distribution_pt.resize(ndofs,0);
1708 
1709  // For each dof type, work out how many parent preconditioner dof types are
1710  // in it.
1711  for (unsigned dof_i = 0; dof_i < ndofs; dof_i++)
1712  {
1713  // For each external dof, we get the dofs coarsened into it (from the
1714  // parent preconditioner level, not the most fine grain level).
1715  const unsigned ncoarsened_dofs_in_dof_i =
1716  Doftype_coarsen_map_coarse[dof_i].size();
1718  tmp_dist_pt(ncoarsened_dofs_in_dof_i,0);
1719  for (unsigned parent_dof_i=0;parent_dof_i<ncoarsened_dofs_in_dof_i;
1720  parent_dof_i++)
1721  {
1722  tmp_dist_pt[parent_dof_i]
1723  = master_block_preconditioner_pt()
1724  ->dof_block_distribution_pt(
1725  Doftype_in_master_preconditioner_coarse[
1726  Doftype_coarsen_map_coarse[dof_i][parent_dof_i] ] );
1727  }
1728 
1729  Dof_block_distribution_pt[dof_i] = new LinearAlgebraDistribution;
1730 
1731 
1733  *Dof_block_distribution_pt[
1734  dof_i]);
1735  }
1736 
1737 
1738  }
1739 
1740  // Create Block_distribution_pt
1741  {
1742  // Delete any existing distributions in Block_distribution_pt.
1743  // (This should already be deleted in clear_block_preconditioner_base(...)
1744  // but we are just being extra safe!).
1745  unsigned n_existing_block_dist
1746  = Block_distribution_pt.size();
1747  for (unsigned dist_i = 0; dist_i < n_existing_block_dist; dist_i++)
1748  {
1749  delete Block_distribution_pt[dist_i];
1750  }
1751 
1752  Block_distribution_pt.clear();
1753 
1754  // Work out the distributions of the concatenated blocks.
1755  unsigned super_block_size = Block_to_dof_map_coarse.size();
1756  Block_distribution_pt.resize(super_block_size,0);
1757  for (unsigned super_block_i = 0;
1758  super_block_i < super_block_size; super_block_i++)
1759  {
1760  unsigned sub_block_size = Block_to_dof_map_coarse[super_block_i].size();
1761  Vector<LinearAlgebraDistribution*> tmp_dist_pt(sub_block_size,0);
1762 
1763  for (unsigned sub_block_i = 0;
1764  sub_block_i < sub_block_size; sub_block_i++)
1765  {
1766  tmp_dist_pt[sub_block_i]
1767  = dof_block_distribution_pt(
1768  Block_to_dof_map_coarse[super_block_i][sub_block_i]);
1769  }
1770 
1771  Block_distribution_pt[super_block_i]
1773 
1775  tmp_dist_pt,*Block_distribution_pt[super_block_i]);
1776  }
1777 
1778  } // Creating Block_distribution_pt.
1779 
1780 
1781  // Create the distribution of the preconditioner matrix,
1782  // if this preconditioner is a subsidiary preconditioner then it stored
1783  // at Distribution_pt;
1784  // if this preconditioner is a master preconditioner then it is stored
1785  // at Internal_preconditioner_matrix_distribution_pt.
1787  LinearAlgebraDistributionHelpers::concatenate(Internal_block_distribution_pt,
1788  dist);
1789 
1790  // Build the distribution.
1791  if (is_subsidiary_block_preconditioner())
1792  {
1793  this->build_distribution(dist);
1794  }
1795  else
1796  {
1797  Internal_preconditioner_matrix_distribution_pt = new
1799  }
1800 
1801  Preconditioner_matrix_distribution_pt = new LinearAlgebraDistribution;
1803  concatenate(Block_distribution_pt,*Preconditioner_matrix_distribution_pt);
1804 
1805  // Clear all distributions in Auxiliary_block_distribution_pt, except for the
1806  // one which corresponds to the preconditioner matrix distribution.
1807  // This is already deleted by clear_block_preconditioner_base(...)
1808 
1809  // Create the key which corresponds to preconditioner_matrix_distribution_pt.
1810  {
1811  const unsigned nblocks = Block_distribution_pt.size();
1812  Vector<unsigned> preconditioner_matrix_key(nblocks,0);
1813  for (unsigned i = 0; i < nblocks; i++)
1814  {
1815  preconditioner_matrix_key[i] = i;
1816  }
1817 
1818  // Now iterate through Auxiliary_block_distribution_pt and delete everything
1819  // except for the value which corresponds to preconditioner_matrix_key.
1820  std::map<Vector<unsigned>, LinearAlgebraDistribution*>::iterator iter
1821  =Auxiliary_block_distribution_pt.begin();
1822  while(iter!=Auxiliary_block_distribution_pt.end())
1823  {
1824  if(iter->first!=preconditioner_matrix_key)
1825  {
1826  delete iter->second;
1827  iter++;
1828  }
1829  else
1830  {
1831  ++iter;
1832  }
1833  }
1834 
1835  // Clear it just to be safe!
1836  Auxiliary_block_distribution_pt.clear();
1837 
1838  // Insert the preconditioner matrix distribution.
1839  insert_auxiliary_block_distribution(preconditioner_matrix_key,
1840  Preconditioner_matrix_distribution_pt);
1841  } // End of Auxiliary_block_distribution_pt encapsulation.
1842 
1843  // Clearing up after comm to assemble sparse lookup schemes.
1844 #ifdef OOMPH_HAS_MPI
1845  if (send_requests_sparse.size()>0)
1846  {
1847  MPI_Waitall(send_requests_sparse.size(),
1848  &send_requests_sparse[0],MPI_STATUS_IGNORE);
1849  }
1850  if (recv_requests_sparse.size()>0)
1851  {
1852  MPI_Waitall(recv_requests_sparse.size(),
1853  &recv_requests_sparse[0],MPI_STATUS_IGNORE);
1854  }
1855  for (unsigned p = 0; p < nproc; p++)
1856  {
1857  delete[] index_in_dof_block_sparse_send[p];
1858  delete[] dof_number_sparse_send[p];
1859  }
1860  delete[] index_in_dof_block_sparse_send;
1861  delete[] dof_number_sparse_send;
1862  delete[] nreq_sparse;
1863  delete[] nreq_sparse_for_proc;
1864 #endif
1865 
1866  // Next we assemble the lookup schemes for the rows
1867  // if the matrix is not distributed then we assemble Global_index
1868  // if the matrix is distributed then Rows_to_send_..., Rows_to_recv_... etc.
1869  if (!distributed)
1870  {
1871  // Resize the storage.
1872  Global_index.resize(Internal_nblock_types);
1873  for (unsigned b = 0; b < Internal_nblock_types; b++)
1874  {
1875  Global_index[b].resize(Internal_block_distribution_pt[b]->nrow());
1876  }
1877 
1878  // Compute:
1879  unsigned nrow=this->master_nrow();
1880  for (unsigned i=0;i<nrow;i++)
1881  {
1882  // the dof type number;
1883  int dof_number=this->internal_dof_number(i);
1884  if (dof_number>=0)
1885  {
1886 
1887  // the block number;
1888  unsigned block_number = Dof_number_to_block_number_lookup[dof_number];
1889 
1890  // the index in the block.
1891  unsigned index_in_block=0;
1892  unsigned ptr=0;
1893  while (int(Block_number_to_dof_number_lookup[block_number][ptr])
1894  !=dof_number)
1895  {
1896  index_in_block+=
1897  internal_dof_block_dimension(Block_number_to_dof_number_lookup[
1898  block_number]
1899  [ptr]);
1900  ptr++;
1901  }
1902  index_in_block+=internal_index_in_dof(i);
1903  Global_index[block_number][index_in_block]=i;
1904  }
1905  }
1906  }
1907  // otherwise the matrix is distributed
1908  else
1909  {
1910 #ifdef OOMPH_HAS_MPI
1911 
1912  // the pointer to the master distribution
1913  const LinearAlgebraDistribution* master_distribution_pt =
1914  this->master_distribution_pt();
1915 
1916  // resize the nrows... storage
1917  Nrows_to_send_for_get_block.resize(Internal_nblock_types,nproc);
1918  Nrows_to_send_for_get_block.initialise(0);
1919  Nrows_to_send_for_get_ordered.resize(nproc);
1920  Nrows_to_send_for_get_ordered.initialise(0);
1921 
1922  // loop over my rows
1923  unsigned nrow_local = master_distribution_pt->nrow_local();
1924  unsigned first_row = master_distribution_pt->first_row();
1925  for (unsigned i = 0; i < nrow_local; i++)
1926  {
1927 
1928  // the block number
1929  int b = this->internal_block_number(first_row + i);
1930 
1931  // check that the DOF i is associated with this preconditioner
1932  if (b >= 0)
1933  {
1934  // the block index
1935  unsigned j = this->internal_index_in_block(first_row + i);
1936 
1937  // the processor this row will be sent to
1938  unsigned block_p = 0;
1939  while(!(Internal_block_distribution_pt[b]->first_row(block_p) <= j &&
1940  (Internal_block_distribution_pt[b]->first_row(block_p) +
1941  Internal_block_distribution_pt[b]->nrow_local(block_p) > j)))
1942  {
1943  block_p++;
1944  }
1945 
1946  // and increment the counter
1947  Nrows_to_send_for_get_block(b,block_p)++;
1948  Nrows_to_send_for_get_ordered[block_p]++;
1949  }
1950  }
1951 
1952  // resize the storage for Nrows_to_recv
1953  Nrows_to_recv_for_get_block.resize(Internal_nblock_types,nproc);
1954  Nrows_to_recv_for_get_block.initialise(0);
1955  Nrows_to_recv_for_get_ordered.resize(nproc);
1956  Nrows_to_recv_for_get_ordered.initialise(0);
1957 
1958  // next we send the number of rows that will be sent by this processor
1959  Vector<unsigned*> nrows_to_send(nproc,0);
1960  Vector<unsigned*> nrows_to_recv(nproc,0);
1961  Vector<MPI_Request> send_requests_nrow;
1962  Vector<MPI_Request> recv_requests_nrow;
1963  Vector<unsigned> proc;
1964  for (unsigned p = 0; p < nproc; p++)
1965  {
1966  if (p != my_rank)
1967  {
1968  // send
1969  proc.push_back(p);
1970  nrows_to_send[p] = new unsigned[Internal_nblock_types];
1971  for (unsigned b = 0; b < Internal_nblock_types; b++)
1972  {
1973  nrows_to_send[p][b] =
1974  Nrows_to_send_for_get_block(b,p);
1975  }
1976  MPI_Request s_req;
1977  MPI_Isend(nrows_to_send[p],Internal_nblock_types,MPI_UNSIGNED,p,3,
1978  comm_pt()->mpi_comm(),&s_req);
1979  send_requests_nrow.push_back(s_req);
1980 
1981  // recv
1982  nrows_to_recv[p] = new unsigned[Internal_nblock_types];
1983  MPI_Request r_req;
1984  MPI_Irecv(nrows_to_recv[p],Internal_nblock_types,MPI_UNSIGNED,p,3,
1985  comm_pt()->mpi_comm(),&r_req);
1986  recv_requests_nrow.push_back(r_req);
1987  }
1988  // send to self
1989  else
1990  {
1991  for (unsigned b = 0; b < Internal_nblock_types; b++)
1992  {
1993  Nrows_to_recv_for_get_block(b,p) =
1994  Nrows_to_send_for_get_block(b,p);
1995  }
1996  Nrows_to_recv_for_get_ordered[p] = Nrows_to_send_for_get_ordered[p];
1997  }
1998  }
1999 
2000  // create some temporary storage for the global row indices that will
2001  // be received from another processor.
2002  DenseMatrix<int*> block_rows_to_send(Internal_nblock_types,nproc,0);
2003  Vector<int*> ordered_rows_to_send(nproc,0);
2004 
2005  // resize the rows... storage
2006  Rows_to_send_for_get_block.resize(Internal_nblock_types,nproc);
2007  Rows_to_send_for_get_block.initialise(0);
2008  Rows_to_send_for_get_ordered.resize(nproc);
2009  Rows_to_send_for_get_ordered.initialise(0);
2010  Rows_to_recv_for_get_block.resize(Internal_nblock_types,nproc);
2011  Rows_to_recv_for_get_block.initialise(0);
2012 
2013  // resize the storage
2014  for (unsigned p = 0; p < nproc; p++)
2015  {
2016  for (unsigned b = 0; b < Internal_nblock_types; b++)
2017  {
2018  Rows_to_send_for_get_block(b,p)
2019  = new int[Nrows_to_send_for_get_block(b,p)];
2020  if (p != my_rank)
2021  {
2022  block_rows_to_send(b,p)
2023  = new int[Nrows_to_send_for_get_block(b,p)];
2024  }
2025  else
2026  {
2027  Rows_to_recv_for_get_block(b,p)
2028  = new int[Nrows_to_send_for_get_block(b,p)];
2029  }
2030  }
2031  Rows_to_send_for_get_ordered[p]
2032  = new int [Nrows_to_send_for_get_ordered[p]];
2033  }
2034 
2035 
2036 
2037  // loop over my rows to allocate the nrows
2038  DenseMatrix<unsigned> ptr_block(Internal_nblock_types,nproc,0);
2039  for (unsigned i = 0; i < nrow_local; i++)
2040  {
2041  // the block number
2042  int b = this->internal_block_number(first_row + i);
2043 
2044  // check that the DOF i is associated with this preconditioner
2045  if (b >= 0)
2046  {
2047 
2048  // the block index
2049  unsigned j = this->internal_index_in_block(first_row + i);
2050 
2051  // the processor this row will be sent to
2052  unsigned block_p = 0;
2053  while(!(Internal_block_distribution_pt[b]->first_row(block_p) <= j &&
2054  (Internal_block_distribution_pt[b]->first_row(block_p) +
2055  Internal_block_distribution_pt[b]->nrow_local(block_p) > j)))
2056  {
2057  block_p++;
2058  }
2059 
2060  // and store the row
2061  Rows_to_send_for_get_block(b,block_p)[ptr_block(b,block_p)] = i;
2062  if (block_p != my_rank)
2063  {
2064  block_rows_to_send(b,block_p)[ptr_block(b,block_p)]
2065  = j - Internal_block_distribution_pt[b]->first_row(block_p);
2066  }
2067  else
2068  {
2069  Rows_to_recv_for_get_block(b,block_p)[ptr_block(b,block_p)]
2070  = j - Internal_block_distribution_pt[b]->first_row(block_p);
2071  }
2072  ptr_block(b,block_p)++;
2073  }
2074  }
2075 
2076  // next block ordered
2077  for (unsigned p = 0; p < nproc; ++p)
2078  {
2079  int pt = 0;
2080  for (unsigned b = 0; b < Internal_nblock_types; ++b)
2081  {
2082 
2083  for (unsigned i = 0; i < Nrows_to_send_for_get_block(b,p); ++i)
2084  {
2085  Rows_to_send_for_get_ordered[p][pt] =
2086  Rows_to_send_for_get_block(b,p)[i];
2087  pt++;
2088  }
2089  }
2090  }
2091 
2092  // next process the nrow recvs as they complete
2093 
2094  // recv and store the data
2095  unsigned c = recv_requests_nrow.size();
2096  while (c > 0)
2097  {
2098 
2099  // wait for any communication to finish
2100  int req_number;
2101  MPI_Waitany(c,&recv_requests_nrow[0],&req_number,MPI_STATUS_IGNORE);
2102  recv_requests_nrow.erase(recv_requests_nrow.begin()+req_number);
2103  c--;
2104 
2105  // determine the source processor
2106  unsigned p = proc[req_number];
2107  proc.erase(proc.begin()+req_number);
2108 
2109  // copy the data to its final storage
2110  Nrows_to_recv_for_get_ordered[p]=0;
2111  for (unsigned b = 0; b < Internal_nblock_types; b++)
2112  {
2113  Nrows_to_recv_for_get_block(b,p) = nrows_to_recv[p][b];
2114  Nrows_to_recv_for_get_ordered[p] += nrows_to_recv[p][b];
2115  }
2116 
2117  // and clear
2118  delete[] nrows_to_recv[p];
2119  }
2120 
2121  // resize the storage for the incoming rows data
2122  Rows_to_recv_for_get_ordered.resize(nproc,0);
2123  for (unsigned p = 0; p < nproc; p++)
2124  {
2125  if (p != my_rank)
2126  {
2127  for (unsigned b = 0; b < Internal_nblock_types; b++)
2128  {
2129  Rows_to_recv_for_get_block(b,p)
2130  = new int[Nrows_to_recv_for_get_block(b,p)];
2131  }
2132  }
2133  }
2134 
2135  // compute the number of sends and recv from this processor
2136  // to each other processor
2137  Vector<unsigned> nsend_for_rows(nproc,0);
2138  Vector<unsigned> nrecv_for_rows(nproc,0);
2139  for (unsigned p = 0; p < nproc; p++)
2140  {
2141  if (p != my_rank)
2142  {
2143  for (unsigned b = 0; b < Internal_nblock_types; b++)
2144  {
2145  if (Nrows_to_send_for_get_block(b,p) > 0)
2146  {
2147  nsend_for_rows[p]++;
2148  }
2149  if (Nrows_to_recv_for_get_block(b,p) > 0)
2150  {
2151  nrecv_for_rows[p]++;
2152  }
2153  }
2154  }
2155  }
2156 
2157  // finally post the sends and recvs
2158  MPI_Aint base_displacement;
2159  MPI_Address(matrix_pt(),&base_displacement);
2160  Vector<MPI_Request> req_rows;
2161  for (unsigned p = 0; p < nproc; p++)
2162  {
2163  if (p != my_rank)
2164  {
2165  // send
2166  if (nsend_for_rows[p] > 0)
2167  {
2168  MPI_Datatype send_types[nsend_for_rows[p]];
2169  MPI_Aint send_displacements[nsend_for_rows[p]];
2170  int send_sz[nsend_for_rows[p]];
2171  unsigned send_ptr = 0;
2172  for (unsigned b = 0; b < Internal_nblock_types; b++)
2173  {
2174  if (Nrows_to_send_for_get_block(b,p) > 0)
2175  {
2176  MPI_Type_contiguous(Nrows_to_send_for_get_block(b,p),
2177  MPI_INT,&send_types[send_ptr]);
2178  MPI_Type_commit(&send_types[send_ptr]);
2179  MPI_Address(block_rows_to_send(b,p),
2180  &send_displacements[send_ptr]);
2181  send_displacements[send_ptr] -= base_displacement;
2182  send_sz[send_ptr] = 1;
2183  send_ptr++;
2184  }
2185  }
2186  MPI_Datatype final_send_type;
2187  MPI_Type_struct(nsend_for_rows[p],send_sz,send_displacements,
2188  send_types,&final_send_type);
2189  MPI_Type_commit(&final_send_type);
2190  for (unsigned i = 0; i < nsend_for_rows[p]; i++)
2191  {
2192  MPI_Type_free(&send_types[i]);
2193  }
2194  MPI_Request send_req;
2195  MPI_Isend(matrix_pt(),1,final_send_type,p,4,
2196  comm_pt()->mpi_comm(),&send_req);
2197  req_rows.push_back(send_req);
2198  MPI_Type_free(&final_send_type);
2199  }
2200 
2201  // recv
2202  if (nrecv_for_rows[p] > 0)
2203  {
2204  MPI_Datatype recv_types[nrecv_for_rows[p]];
2205  MPI_Aint recv_displacements[nrecv_for_rows[p]];
2206  int recv_sz[nrecv_for_rows[p]];
2207  unsigned recv_ptr = 0;
2208  for (unsigned b = 0; b < Internal_nblock_types; b++)
2209  {
2210  if (Nrows_to_recv_for_get_block(b,p) > 0)
2211  {
2212  MPI_Type_contiguous(Nrows_to_recv_for_get_block(b,p),
2213  MPI_INT,&recv_types[recv_ptr]);
2214  MPI_Type_commit(&recv_types[recv_ptr]);
2215  MPI_Address(Rows_to_recv_for_get_block(b,p),
2216  &recv_displacements[recv_ptr]);
2217  recv_displacements[recv_ptr] -= base_displacement;
2218  recv_sz[recv_ptr] = 1;
2219  recv_ptr++;
2220  }
2221  }
2222  MPI_Datatype final_recv_type;
2223  MPI_Type_struct(nrecv_for_rows[p],recv_sz,recv_displacements,
2224  recv_types,&final_recv_type);
2225  MPI_Type_commit(&final_recv_type);
2226  for (unsigned i = 0; i < nrecv_for_rows[p]; i++)
2227  {
2228  MPI_Type_free(&recv_types[i]);
2229  }
2230  MPI_Request recv_req;
2231  MPI_Irecv(matrix_pt(),1,final_recv_type,p,4,
2232  comm_pt()->mpi_comm(),&recv_req);
2233  req_rows.push_back(recv_req);
2234  MPI_Type_free(&final_recv_type);
2235  }
2236  }
2237  }
2238 
2239  // cleaning up Waitalls
2240 
2241 
2242  // wait for the recv requests so we can compute
2243  // Nrows_to_recv_for_get_ordered
2244  unsigned n_req_rows = req_rows.size();
2245  if (n_req_rows)
2246  {
2247  MPI_Waitall(n_req_rows,&req_rows[0],MPI_STATUS_IGNORE);
2248  }
2249 
2250  // resize the storage
2251  Rows_to_recv_for_get_ordered.resize(nproc);
2252  Rows_to_recv_for_get_ordered.initialise(0);
2253 
2254  // construct block offset
2255  Vector<int> vec_offset(Internal_nblock_types,0);
2256  for (unsigned b = 1; b < Internal_nblock_types; ++b)
2257  {
2258  vec_offset[b]=vec_offset[b-1]+Internal_block_distribution_pt[b-1]->nrow_local();
2259  }
2260 
2261  //
2262  for (unsigned p = 0; p < nproc; p++)
2263  {
2264  int pt = 0;
2265  Rows_to_recv_for_get_ordered[p]
2266  = new int[Nrows_to_recv_for_get_ordered[p]];
2267  for (unsigned b = 0; b < Internal_nblock_types; b++)
2268  {
2269  for (unsigned i = 0; i < Nrows_to_recv_for_get_block(b,p); i++)
2270  {
2271  Rows_to_recv_for_get_ordered[p][pt] =
2272  Rows_to_recv_for_get_block(b,p)[i]+vec_offset[b];
2273  pt++;
2274  }
2275  }
2276  }
2277 
2278  // clean up
2279  for (unsigned p = 0; p < nproc; p++)
2280  {
2281  if (p!= my_rank)
2282  {
2283  for (unsigned b = 0; b < Internal_nblock_types; b++)
2284  {
2285  delete[] block_rows_to_send(b,p);
2286  }
2287  if (Nrows_to_send_for_get_ordered[p] > 0)
2288  {
2289  delete[] ordered_rows_to_send[p];
2290  }
2291  }
2292  }
2293 
2294  // and the send reqs
2295  unsigned n_req_send_nrow = send_requests_nrow.size();
2296  if (n_req_send_nrow)
2297  {
2298  MPI_Waitall(n_req_send_nrow,&send_requests_nrow[0],MPI_STATUS_IGNORE);
2299  }
2300  for (unsigned p = 0; p < nproc; p++)
2301  {
2302  delete[] nrows_to_send[p];
2303  }
2304 #endif
2305  }
2306 
2307  // If we asked for output of blocks to a file then do it.
2308  if(block_output_on())
2309  output_blocks_to_files(Output_base_filename);
2310  }
2311 
2312  //============================================================================
2313  //??ds
2314  /// \short Function to turn this preconditioner into a
2315  /// subsidiary preconditioner that operates within a bigger
2316  /// "master block preconditioner (e.g. a Navier-Stokes 2x2 block
2317  /// preconditioner dealing with the fluid sub-blocks within a
2318  /// 3x3 FSI preconditioner. Once this is done the master block
2319  /// preconditioner deals with the block setup etc.
2320  /// The vector block_map must specify the dof number in the
2321  /// master preconditioner that corresponds to a block number in this
2322  /// preconditioner. ??ds horribly misleading comment!
2323  /// The length of the vector is used to determine the number of
2324  /// blocks in this preconditioner therefore it must be correctly sized.
2325  /// This calls the other turn_into_subsidiary_block_preconditioner(...)
2326  /// function providing an empty doftype_to_doftype_map vector.
2327  //============================================================================
2328  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2330  (BlockPreconditioner<MATRIX>* master_block_prec_pt,
2331  const Vector<unsigned>& doftype_in_master_preconditioner_coarse)
2332  {
2333  // Create the identity dof_coarsen_map
2334  Vector<Vector<unsigned> > doftype_coarsen_map_coarse;
2335  unsigned doftype_in_master_preconditioner_coarse_size
2336  = doftype_in_master_preconditioner_coarse.size();
2337 
2338  for (unsigned dof_i = 0; dof_i
2339  < doftype_in_master_preconditioner_coarse_size; dof_i++)
2340  {
2341  // Create a vector of size 1 and value i,
2342  // then push it into the dof_coarsen_map vector.
2343  Vector<unsigned> tmp_vec(1,dof_i);
2344  doftype_coarsen_map_coarse.push_back(tmp_vec);
2345  }
2346 
2347  // Call the other turn_into_subsidiary_block_preconditioner function.
2348  turn_into_subsidiary_block_preconditioner(
2349  master_block_prec_pt,
2350  doftype_in_master_preconditioner_coarse,
2351  doftype_coarsen_map_coarse);
2352  }
2353 
2354 
2355  //============================================================================
2356  /// \short Function to turn this block preconditioner into a
2357  /// subsidiary block preconditioner that operates within a bigger
2358  /// master block preconditioner (e.g. a Navier-Stokes 2x2 block
2359  /// preconditioner dealing with the fluid sub-blocks within a
2360  /// 3x3 FSI preconditioner. Once this is done the master block
2361  /// preconditioner deals with the block setup etc.
2362  ///
2363  /// The vector doftype_map must specify the dof type in the
2364  /// master preconditioner that corresponds to a dof type in this block
2365  /// preconditioner.
2366  ///
2367  /// In general, we want:
2368  /// doftype_map[doftype in subsidiary prec] = doftype in master prec.
2369  ///
2370  /// It tells this block preconditioner which dof types of the master
2371  /// block preconditioner it is working with.
2372  ///
2373  /// The length of the vector is used to determine the number of
2374  /// dof types in THIS block preconditioner therefore it must be correctly
2375  /// sized.
2376  ///
2377  /// For example, let the master block preconditioner have 5 dof types in total
2378  /// and a 1-4 dof type splitting where the block (0,0) corresponds to
2379  /// dof type 0 and the block (1,1) corresponds to dof types 1, 2, 3 and 4
2380  /// (i.e. it would have given to block_setup the vector [0,1,1,1,1]).
2381  /// Furthermore, it solves (1,1) block with subsidiary block preconditioner.
2382  /// Then the doftype_map passed to this function of the subsidiary block
2383  /// preconditioner would be [1, 2, 3, 4].
2384  ///
2385  /// Dof type coarsening (following on from the example above):
2386  /// Let the subsidiary block preconditioner (THIS block preconditioner)
2387  /// only works with two DOF types, then the master block preconditioner must
2388  /// "coarsen" the dof types by providing the optional argument
2389  /// doftype_coarsen_map vector.
2390  ///
2391  /// The doftype_coarsen_map vector (in this case) might be [[0,1], [2,3]]
2392  /// telling the subsidiary block preconditioner that the SUBSIDIARY dof types
2393  /// 0 and 1 should be treated as dof type 0 and the subsidiary dof types 2
2394  /// and 3 should be treated as subsidiary dof type 1.
2395  ///
2396  /// If no doftype_coarsen_map vector is provided, then the identity is
2397  /// used automatically (see the turn_into_subsidiary_block_preconditioner(...)
2398  /// function with only two arguments). In the above case, the identity
2399  /// doftype_coarsen_map vector for the subsidiary block preconditioner
2400  /// would be the 2D vector [[0], [1], [2], [3]] which means
2401  /// dof type 0 is treated as dof type 0,
2402  /// dof type 1 is treated as dof type 1,
2403  /// dof type 2 is treated as dof type 2, and
2404  /// dof type 3 is treated as dof type 3.
2405  //============================================================================
2406  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2408  (BlockPreconditioner<MATRIX>* master_block_prec_pt,
2409  const Vector<unsigned>& doftype_in_master_preconditioner_coarse,
2410  const Vector<Vector<unsigned> > &doftype_coarsen_map_coarse)
2411  {
2412 
2413 
2414  // Set the master block preconditioner pointer
2415  Master_block_preconditioner_pt = master_block_prec_pt;
2416 
2417  // Set the Doftype_coarsen_map_coarse.
2418  Doftype_coarsen_map_coarse = doftype_coarsen_map_coarse;
2419 
2420  Doftype_in_master_preconditioner_coarse =
2421  doftype_in_master_preconditioner_coarse;
2422  } // end of turn_into_subsidiary_block_preconditioner(...)
2423 
2424 
2425  //============================================================================
2426  /// Determine the size of the matrix blocks and setup the
2427  /// lookup schemes relating the global degrees of freedom with
2428  /// their "blocks" and their indices (row/column numbers) in those
2429  /// blocks.
2430  /// The distributions of the preconditioner and the blocks are
2431  /// automatically specified (and assumed to be uniform) at this
2432  /// stage.
2433  /// This method should be used if each DOF type corresponds to a
2434  /// unique block type.
2435  //============================================================================
2436  template<typename MATRIX>
2438  {
2439 
2440 #ifdef PARANOID
2441 
2442  // Subsidiary preconditioners don't really need the meshes
2443  if (this->is_master_block_preconditioner())
2444  {
2445  std::ostringstream err_msg;
2446  unsigned n=nmesh();
2447  if (n==0)
2448  {
2449  err_msg << "No meshes have been set for this block preconditioner!\n"
2450  << "Set one with set_nmesh(...), set_mesh(...)" << std::endl;
2451  throw OomphLibError(err_msg.str(),
2452  OOMPH_CURRENT_FUNCTION,
2453  OOMPH_EXCEPTION_LOCATION);
2454  for (unsigned m=0;m<n;m++)
2455  {
2456  if (Mesh_pt[m]==0)
2457  {
2458  err_msg << "The mesh pointer to mesh " << m << " is null!\n"
2459  << "Set a non-null one with set_mesh(...)" << std::endl;
2460  throw OomphLibError(err_msg.str(),
2461  OOMPH_CURRENT_FUNCTION,
2462  OOMPH_EXCEPTION_LOCATION);
2463 
2464  }
2465  }
2466  }
2467  }
2468 #endif
2469 
2470  // Get the number of dof types.
2471  unsigned internal_n_dof_types = ndof_types();
2472 
2473  // Build the dof to block map - assume that each type of dof corresponds
2474  // to a different type of block.
2475  Vector<unsigned> dof_to_block_lookup(internal_n_dof_types);
2476  for (unsigned i = 0; i < internal_n_dof_types; i++)
2477  {
2478  dof_to_block_lookup[i] = i;
2479  }
2480 
2481  // call the block setup method
2482  this->block_setup(dof_to_block_lookup);
2483  }
2484 
2485 
2486  //============================================================================
2487  /// Get the block matrices required for the block preconditioner. Takes a
2488  /// pointer to a matrix of bools that indicate if a specified sub-block is
2489  /// required for the preconditioning operation. Computes the required block
2490  /// matrices, and stores pointers to them in the matrix block_matrix_pt. If an
2491  /// entry in block_matrix_pt is equal to NULL that sub-block has not been
2492  /// requested and is therefore not available.
2493  //============================================================================
2494  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2496  DenseMatrix<MATRIX*>& block_matrix_pt) const
2497  {
2498 
2499  // Cache number of block types
2500  const unsigned n_block_types = nblock_types();
2501 
2502 #ifdef PARANOID
2503  // If required blocks matrix pointer is not the correct size then abort.
2504  if ((required_blocks.nrow() != n_block_types) ||
2505  (required_blocks.ncol() != n_block_types))
2506  {
2507 
2508  std::ostringstream error_message;
2509  error_message << "The size of the matrix of bools required_blocks "
2510  << "(which indicates which blocks are required) is not the "
2511  << "right size, required_blocks is "
2512  << required_blocks.ncol()
2513  << " x " << required_blocks.nrow() << ", whereas it should "
2514  << "be " << n_block_types << " x " << n_block_types;
2515  throw OomphLibError(error_message.str(),
2516  OOMPH_CURRENT_FUNCTION,
2517  OOMPH_EXCEPTION_LOCATION);
2518  }
2519 
2520  // If block matrix pointer is not the correct size then abort.
2521  if ((block_matrix_pt.nrow() != n_block_types) ||
2522  (block_matrix_pt.ncol() != n_block_types))
2523  {
2524  std::ostringstream error_message;
2525  error_message << "The size of the block matrix pt is not the "
2526  << "right size, block_matrix_pt is "
2527  << block_matrix_pt.ncol()
2528  << " x " << block_matrix_pt.nrow() << ", whereas it should "
2529  << "be " << n_block_types << " x " << n_block_types;
2530  throw OomphLibError(error_message.str(),
2531  OOMPH_CURRENT_FUNCTION,
2532  OOMPH_EXCEPTION_LOCATION);
2533  }
2534 
2535 #endif
2536 
2537  // Loop over the blocks
2538  for (unsigned i = 0; i < n_block_types; i++)
2539  {
2540  for (unsigned j = 0; j < n_block_types; j++)
2541  {
2542  // If block(i,j) is required then create a matrix and fill it in.
2543  if (required_blocks(i,j))
2544  {
2545  //??ds might want to remove this use of new as well?
2546  block_matrix_pt(i,j) = new MATRIX;
2547  get_block(i, j, *block_matrix_pt(i,j));
2548  }
2549 
2550  // Otherwise set pointer to null.
2551  else
2552  {
2553  block_matrix_pt(i,j) = 0;
2554  }
2555  }
2556  }
2557  }
2558 
2559  //============================================================================
2560  /// Takes the naturally ordered vector and extracts the blocks
2561  /// indicated by the block number (the values) in the Vector
2562  /// block_vec_number all at once, then concatenates them without
2563  /// communication. Here, the values in block_vec_number is the block number
2564  /// in the current preconditioner.
2565  /// This is a non-const function because distributions may be created
2566  /// and stored in Auxiliary_block_distribution_pt for future use.
2567  //============================================================================
2568  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2570  const DoubleVector& v, DoubleVector& w)
2571  {
2572 #ifdef PARANOID
2573 
2574  // Check if v is built.
2575  if (!v.built())
2576  {
2577  std::ostringstream err_msg;
2578  err_msg << "The distribution of the global vector v must be setup.";
2579  throw OomphLibError(err_msg.str(),
2580  OOMPH_CURRENT_FUNCTION,
2581  OOMPH_EXCEPTION_LOCATION);
2582  }
2583 
2584  // v must have the same distribution as the upper-most master block
2585  // preconditioner, since the upper-most master block preconditioner
2586  // should have the same distribution as the matrix pointed to
2587  // by matrix_pt().
2588  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
2589  {
2590  std::ostringstream err_msg;
2591  err_msg << "The distribution of the global vector v must match the "
2592  << " specified master_distribution_pt(). \n"
2593  << "i.e. Distribution_pt in the master preconditioner";
2594  throw OomphLibError(err_msg.str(),
2595  OOMPH_CURRENT_FUNCTION,
2596  OOMPH_EXCEPTION_LOCATION);
2597  }
2598 
2599  // Check to see if there are more blocks defined in the block_vec_number
2600  // vector than the number of block types. This is not allowed.
2601  const unsigned para_nblock_types = nblock_types();
2602  const unsigned para_block_vec_number_size = block_vec_number.size();
2603  if(para_block_vec_number_size > para_nblock_types)
2604  {
2605  std::ostringstream err_msg;
2606  err_msg << "You have requested " << para_block_vec_number_size
2607  << " number of blocks, (block_vec_number.size() is "
2608  << para_block_vec_number_size << ").\n"
2609  << "But there are only " << para_nblock_types << " nblock_types.\n"
2610  << "Please make sure that block_vec_number is correctly sized.\n";
2611  throw OomphLibError(err_msg.str(),
2612  OOMPH_CURRENT_FUNCTION,
2613  OOMPH_EXCEPTION_LOCATION);
2614  }
2615 
2616  // Check if any block numbers defined in block_vec_number is equal to or
2617  // greater than the number of block types.
2618  // E.g. if there are 5 block types, we can only have block numbers:
2619  // 0, 1, 2, 3 and 4.
2620  for (unsigned i = 0; i < para_block_vec_number_size; i++)
2621  {
2622  const unsigned para_required_block = block_vec_number[i];
2623  if(para_required_block >= para_nblock_types)
2624  {
2625  std::ostringstream err_msg;
2626  err_msg << "block_vec_number[" << i << "] is " << para_required_block
2627  << ".\n"
2628  << "But there are only " << para_nblock_types
2629  << " nblock_types.\n";
2630  throw OomphLibError(err_msg.str(),
2631  OOMPH_CURRENT_FUNCTION,
2632  OOMPH_EXCEPTION_LOCATION);
2633  }
2634  }
2635 
2636  // Check that no block number is inserted twice.
2637  std::set<unsigned> para_set;
2638  for (unsigned b = 0; b < para_block_vec_number_size; b++)
2639  {
2640  std::pair<std::set<unsigned>::iterator,bool> para_set_ret;
2641  para_set_ret = para_set.insert(block_vec_number[b]);
2642 
2643  if(!para_set_ret.second)
2644  {
2645  std::ostringstream err_msg;
2646  err_msg << "Error: the block number "
2647  << block_vec_number[b]
2648  << " appears twice.\n";
2649  throw OomphLibError(err_msg.str(),
2650  OOMPH_CURRENT_FUNCTION,
2651  OOMPH_EXCEPTION_LOCATION);
2652  }
2653  }
2654 #endif
2655 
2656  // Number of blocks to get.
2657  const unsigned n_block = block_vec_number.size();
2658 
2659  // Each block is made of dof types. We get the most fine grain dof types.
2660  // Most fine grain in the sense that these are the dof types that belongs
2661  // in this block before any coarsening of dof types has taken place.
2662  // The ordering of the dof types matters, this is handled properly when
2663  // creating the Block_to_dof_map_fine vector and must be respected here.
2664  // I.e. we cannot arbitrarily insert dof types (even if they are correct)
2665  // in the vector most_fine_grain_dof.
2666  Vector<unsigned> most_fine_grain_dof;
2667  for (unsigned b = 0; b < n_block; b++)
2668  {
2669  const unsigned mapped_b = block_vec_number[b];
2670  most_fine_grain_dof.insert(
2671  most_fine_grain_dof.end(),
2672  Block_to_dof_map_fine[mapped_b].begin(),
2673  Block_to_dof_map_fine[mapped_b].end());
2674  }
2675 
2676  // Get all the dof level vectors in one go.
2677  Vector<DoubleVector> dof_block_vector;
2678  internal_get_block_vectors(most_fine_grain_dof,
2679  v,dof_block_vector);
2680 
2681  // Next we need to build the output DoubleVector w with the correct
2682  // distribution: the concatenation of the distributions of all the
2683  // dof-level vectors. This is the same as the concatenation of the
2684  // distributions of the blocks within this preconditioner.
2685  //
2686  // So we first check if it exists already, if not, we create it and
2687  // store it for future use. We store it because concatenation of
2688  // distributions requires communication, so concatenation of
2689  // distributions on-the-fly should be avoided.
2690  std::map<Vector<unsigned>,
2691  LinearAlgebraDistribution* >::const_iterator iter;
2692 
2693  // Attempt to get an iterator pointing to the pair with the value
2694  // block_vec_number.
2695  iter = Auxiliary_block_distribution_pt.find(block_vec_number);
2696 
2697  if(iter != Auxiliary_block_distribution_pt.end())
2698  // If it exists, build w with the distribution pointed to
2699  // by pair::second.
2700  {
2701  w.build(iter->second);
2702  }
2703  else
2704  // Else, we need to create the distribution and store it in
2705  // Auxiliary_block_distribution_pt.
2706  {
2707  Vector<LinearAlgebraDistribution*> tmp_vec_dist_pt(n_block,0);
2708  for (unsigned b = 0; b < n_block; b++)
2709  {
2710  tmp_vec_dist_pt[b] = Block_distribution_pt[block_vec_number[b]];
2711  }
2712 
2713  // Note that the distribution is created with new but not deleted here.
2714  // This is handled in the clean up functions.
2717  *tmp_dist_pt);
2718 
2719  // Store the pair of Vector<unsigned> and LinearAlgebraDistribution*
2720  insert_auxiliary_block_distribution(block_vec_number,tmp_dist_pt);
2721 
2722  // Build w.
2723  w.build(tmp_dist_pt);
2724  }
2725 
2726  // Now concatenate all the dof level vectors into the vector w.
2728  dof_block_vector,w);
2729 
2730  } // get_concatenated_block_vector(...)
2731 
2732  //============================================================================
2733  /// \short Takes concatenated block ordered vector, b, and copies its
2734  // entries to the appropriate entries in the naturally ordered vector, v.
2735  // Here the values in block_vec_number indicates which blocks the vector
2736  // b is a concatenation of. The block number are those in the current
2737  // preconditioner. If the preconditioner is a subsidiary block
2738  // preconditioner the other entries in v that are not associated with it
2739  // are left alone.
2740  //============================================================================
2741  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2743  const DoubleVector& w, DoubleVector& v) const
2744  {
2745 #ifdef PARANOID
2746 
2747  // Check if v is built.
2748  if (!v.built())
2749  {
2750  std::ostringstream err_msg;
2751  err_msg << "The distribution of the global vector v must be setup.";
2752  throw OomphLibError(err_msg.str(),
2753  OOMPH_CURRENT_FUNCTION,
2754  OOMPH_EXCEPTION_LOCATION);
2755  }
2756 
2757  // v must have the same distribution as the upper-most master block
2758  // preconditioner, since the upper-most master block preconditioner
2759  // should have the same distribution as the matrix pointed to
2760  // by matrix_pt().
2761  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
2762  {
2763  std::ostringstream err_msg;
2764  err_msg << "The distribution of the global vector v must match the "
2765  << " specified master_distribution_pt(). \n"
2766  << "i.e. Distribution_pt in the master preconditioner";
2767  throw OomphLibError(err_msg.str(),
2768  OOMPH_CURRENT_FUNCTION,
2769  OOMPH_EXCEPTION_LOCATION);
2770  }
2771 
2772  // Check to see if there are more blocks defined in the block_vec_number
2773  // vector than the number of block types. This is not allowed.
2774  const unsigned para_block_vec_number_size = block_vec_number.size();
2775  const unsigned para_n_block = nblock_types();
2776  if(para_block_vec_number_size > para_n_block)
2777  {
2778  std::ostringstream err_msg;
2779  err_msg << "Trying to return " << para_block_vec_number_size
2780  << " block vectors.\n"
2781  << "But there are only " << para_n_block << " block types.\n";
2782  throw OomphLibError(err_msg.str(),
2783  OOMPH_CURRENT_FUNCTION,
2784  OOMPH_EXCEPTION_LOCATION);
2785  }
2786 
2787  // Check if any block numbers defined in block_vec_number is equal to or
2788  // greater than the number of block types.
2789  // E.g. if there are 5 block types, we can only have block numbers:
2790  // 0, 1, 2, 3 and 4.
2791  for (unsigned b = 0; b < para_block_vec_number_size; b++)
2792  {
2793  const unsigned para_required_block = block_vec_number[b];
2794  if(para_required_block > para_n_block)
2795  {
2796  std::ostringstream err_msg;
2797  err_msg << "block_vec_number[" << b << "] is " << para_required_block
2798  << ".\n"
2799  << "But there are only " << para_n_block << " block types.\n";
2800  throw OomphLibError(err_msg.str(),
2801  OOMPH_CURRENT_FUNCTION,
2802  OOMPH_EXCEPTION_LOCATION);
2803 
2804  }
2805  }
2806 
2807  // Check that no block number is inserted twice.
2808  std::set<unsigned> para_set;
2809  for (unsigned b = 0; b < para_block_vec_number_size; b++)
2810  {
2811  std::pair<std::set<unsigned>::iterator,bool> para_set_ret;
2812  para_set_ret = para_set.insert(block_vec_number[b]);
2813 
2814  if(!para_set_ret.second)
2815  {
2816  std::ostringstream err_msg;
2817  err_msg << "Error: the block number "
2818  << block_vec_number[b]
2819  << " appears twice.\n";
2820  throw OomphLibError(err_msg.str(),
2821  OOMPH_CURRENT_FUNCTION,
2822  OOMPH_EXCEPTION_LOCATION);
2823  }
2824  }
2825 
2826  // Check that w is built.
2827  if (!w.built())
2828  {
2829  std::ostringstream err_msg;
2830  err_msg << "The distribution of the block vector w must be setup.";
2831  throw OomphLibError(err_msg.str(),
2832  OOMPH_CURRENT_FUNCTION,
2833  OOMPH_EXCEPTION_LOCATION);
2834  }
2835 
2836  // Check that the distributions defined by block_vec_number is correct for
2837  // the distribution from w.
2838  // Recall that w is the concatenation of the block vectors defined by
2839  // the values in block_vec_number. We check that this is the case.
2840  Vector<LinearAlgebraDistribution*> para_vec_dist_pt(
2841  para_block_vec_number_size,0);
2842 
2843  for (unsigned b = 0; b < para_block_vec_number_size; b++)
2844  {
2845  para_vec_dist_pt[b] = Block_distribution_pt[block_vec_number[b]];
2846  }
2847 
2848  LinearAlgebraDistribution para_tmp_dist;
2849 
2851  para_tmp_dist);
2852 
2853  if(*w.distribution_pt() != para_tmp_dist)
2854  {
2855  std::ostringstream err_msg;
2856  err_msg << "The distribution of the block vector w does not match \n"
2857  << "the concatenation of the block distributions defined in \n"
2858  << "block_vec_number.\n";
2859  throw OomphLibError(err_msg.str(),
2860  OOMPH_CURRENT_FUNCTION,
2861  OOMPH_EXCEPTION_LOCATION);
2862  }
2863 #endif
2864 
2865  // Number of blocks to return.
2866  const unsigned n_block = block_vec_number.size();
2867 
2868  // Each block is made of dof types. We get the most fine grain dof types.
2869  // Most fine grain in the sense that these are the dof types that belongs
2870  // in this block before any coarsening of dof types has taken place.
2871  // The ordering of the dof types matters, this is handled properly when
2872  // creating the Block_to_dof_map_fine vector and must be respected here.
2873  // I.e. we cannot arbitrarily insert dof types (even if they are correct)
2874  // in the vector most_fine_grain_dof.
2875  Vector<unsigned> most_fine_grain_dof;
2876  for (unsigned b = 0; b < n_block; b++)
2877  {
2878  const unsigned mapped_b = block_vec_number[b];
2879  most_fine_grain_dof.insert(
2880  most_fine_grain_dof.end(),
2881  Block_to_dof_map_fine[mapped_b].begin(),
2882  Block_to_dof_map_fine[mapped_b].end());
2883  }
2884 
2885  // The number of most fine grain dof types associated with the blocks
2886  // defined by block_vec_number.
2887  const unsigned ndof = most_fine_grain_dof.size();
2888 
2889  // Build each dof level vector with the correct distribution.
2890  Vector<DoubleVector> dof_vector(ndof);
2891  for (unsigned d = 0; d < ndof; d++)
2892  {
2893  dof_vector[d].build(internal_block_distribution_pt(
2894  most_fine_grain_dof[d]));
2895  }
2896 
2897  // Perform the splitting of w into the most fine grain dof level vectors.
2899 
2900  // Return all the dof level vectors in one go.
2901  internal_return_block_vectors(most_fine_grain_dof,
2902  dof_vector,
2903  v);
2904  } // return_concatenated_block_vector(...)
2905 
2906  //============================================================================
2907  /// \short Takes the naturally ordered vector and rearranges it into a
2908  /// vector of sub vectors corresponding to the blocks, so s[b][i] contains
2909  /// the i-th entry in the vector associated with block b.
2910  /// Note: If the preconditioner is a subsidiary preconditioner then only the
2911  /// sub-vectors associated with the blocks of the subsidiary preconditioner
2912  /// will be included. Hence the length of v is master_nrow() whereas the
2913  /// total length of the s vectors is the sum of the lengths of the
2914  /// individual block vectors defined in block_vec_number.
2915  //============================================================================
2916  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
2917  get_block_vectors(const Vector<unsigned>& block_vec_number,
2918  const DoubleVector& v, Vector<DoubleVector >& s) const
2919  {
2920 #ifdef PARANOID
2921 
2922  // Check if v is built.
2923  if (!v.built())
2924  {
2925  std::ostringstream err_msg;
2926  err_msg << "The distribution of the global vector v must be setup.";
2927  throw OomphLibError(err_msg.str(),
2928  OOMPH_CURRENT_FUNCTION,
2929  OOMPH_EXCEPTION_LOCATION);
2930  }
2931 
2932  // v must have the same distribution as the upper-most master block
2933  // preconditioner, since the upper-most master block preconditioner
2934  // should have the same distribution as the matrix pointed to
2935  // by matrix_pt().
2936  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
2937  {
2938  std::ostringstream err_msg;
2939  err_msg << "The distribution of the global vector v must match the "
2940  << " specified master_distribution_pt(). \n"
2941  << "i.e. Distribution_pt in the master preconditioner";
2942  throw OomphLibError(err_msg.str(),
2943  OOMPH_CURRENT_FUNCTION,
2944  OOMPH_EXCEPTION_LOCATION);
2945  }
2946 
2947  // Check to see if there are more blocks defined in the block_vec_number
2948  // vector than the number of block types. This is not allowed.
2949  const unsigned para_nblock_types = nblock_types();
2950  const unsigned para_block_vec_number_size = block_vec_number.size();
2951  if(para_block_vec_number_size > para_nblock_types)
2952  {
2953  std::ostringstream err_msg;
2954  err_msg << "You have requested " << para_block_vec_number_size
2955  << " number of blocks, (block_vec_number.size() is "
2956  << para_block_vec_number_size << ").\n"
2957  << "But there are only " << para_nblock_types << " nblock_types.\n"
2958  << "Please make sure that block_vec_number is correctly sized.\n";
2959  throw OomphLibError(err_msg.str(),
2960  OOMPH_CURRENT_FUNCTION,
2961  OOMPH_EXCEPTION_LOCATION);
2962  }
2963 
2964  // Check if any block numbers defined in block_vec_number is equal to or
2965  // greater than the number of block types.
2966  // E.g. if there are 5 block types, we can only have block numbers:
2967  // 0, 1, 2, 3 and 4.
2968  for (unsigned i = 0; i < para_block_vec_number_size; i++)
2969  {
2970  const unsigned para_required_block = block_vec_number[i];
2971  if(para_required_block > para_nblock_types)
2972  {
2973  std::ostringstream err_msg;
2974  err_msg << "block_vec_number[" << i << "] is " << para_required_block
2975  << ".\n"
2976  << "But there are only " << para_nblock_types
2977  << " nblock_types.\n";
2978  throw OomphLibError(err_msg.str(),
2979  OOMPH_CURRENT_FUNCTION,
2980  OOMPH_EXCEPTION_LOCATION);
2981  }
2982  }
2983  // Check that no block number is inserted twice.
2984  std::set<unsigned> para_set;
2985  for (unsigned b = 0; b < para_block_vec_number_size; b++)
2986  {
2987  std::pair<std::set<unsigned>::iterator,bool> para_set_ret;
2988  para_set_ret = para_set.insert(block_vec_number[b]);
2989 
2990  if(!para_set_ret.second)
2991  {
2992  std::ostringstream err_msg;
2993  err_msg << "Error: the block number "
2994  << block_vec_number[b]
2995  << " appears twice.\n";
2996  throw OomphLibError(err_msg.str(),
2997  OOMPH_CURRENT_FUNCTION,
2998  OOMPH_EXCEPTION_LOCATION);
2999  }
3000  }
3001 #endif
3002 
3003  // Number of blocks to get.
3004  const unsigned n_block = block_vec_number.size();
3005  s.resize(n_block);
3006 
3007  // Each block is made of dof types. We get the most fine grain dof types.
3008  // Most fine grain in the sense that these are the dof types that belongs
3009  // in this block before any coarsening of dof types has taken place.
3010  // The ordering of the dof types matters, this is handled properly when
3011  // creating the Block_to_dof_map_fine vector and must be respected here.
3012  // I.e. we cannot arbitrarily insert dof types (even if they are correct)
3013  // in the vector most_fine_grain_dof.
3014  Vector<unsigned> most_fine_grain_dof;
3015  for (unsigned b = 0; b < n_block; b++)
3016  {
3017  const unsigned mapped_b = block_vec_number[b];
3018 
3019  most_fine_grain_dof.insert(
3020  most_fine_grain_dof.end(),
3021  Block_to_dof_map_fine[mapped_b].begin(),
3022  Block_to_dof_map_fine[mapped_b].end());
3023  }
3024 
3025  // Get all the dof level vectors in one go.
3026  Vector<DoubleVector> dof_vector;
3027  internal_get_block_vectors(most_fine_grain_dof,
3028  v,dof_vector);
3029 
3030  // For each block vector requested,
3031  // build the block s[b],
3032  // concatenate the corresponding dof vector
3033 
3034  // Since all the dof vectors are in dof_vector,
3035  // we need to loop through this.
3036  // The offset helps us loop through this.
3037  unsigned offset = 0;
3038 
3039  for (unsigned b = 0; b < n_block; b++)
3040  {
3041  // The actual block number required.
3042  const unsigned mapped_b = block_vec_number[b];
3043 
3044  // How many most fine grain dofs are in this block?
3045  const unsigned n_dof = Block_to_dof_map_fine[mapped_b].size();
3046 
3047  if(n_dof == 1)
3048  // No need to concatenate, just copy the DoubleVector.
3049  {
3050  s[b] = dof_vector[offset];
3051  }
3052  else
3053  // Concatenate the relevant dof vectors into s[b].
3054  {
3055  s[b].build(Block_distribution_pt[mapped_b],0);
3056  Vector<DoubleVector*> tmp_vec_pt(n_dof,0);
3057  for (unsigned vec_i = 0; vec_i < n_dof; vec_i++)
3058  {
3059  tmp_vec_pt[vec_i] = &dof_vector[offset + vec_i];
3060  }
3061 
3063  tmp_vec_pt,s[b]);
3064  }
3065 
3066  // Update the offset.
3067  offset += n_dof;
3068  }
3069  } // get_block_vectors(...)
3070 
3071 
3072  //============================================================================
3073  /// \short Takes the naturally ordered vector and rearranges it into a
3074  /// vector of sub vectors corresponding to the blocks, so s[b][i] contains
3075  /// the i-th entry in the vector associated with block b.
3076  /// Note: If the preconditioner is a subsidiary preconditioner then only the
3077  /// sub-vectors associated with the blocks of the subsidiary preconditioner
3078  /// will be included. Hence the length of v is master_nrow() whereas the
3079  /// total length of the s vectors is Nrow.
3080  /// This is simply a wrapper around the other get_block_vectors(...) function
3081  /// where the block_vec_number Vector is the identity, i.e.
3082  /// block_vec_number is [0, 1, ..., nblock_types - 1].
3083  //============================================================================
3084  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3086  {
3087  // Get the number of blocks in this block preconditioner.
3088  const unsigned n_block = nblock_types();
3089 
3090  // Create the identity vector.
3091  Vector<unsigned> required_block(n_block,0);
3092  for (unsigned i = 0; i < n_block; i++)
3093  {
3094  required_block[i] = i;
3095  }
3096 
3097  // Call the other function which does the work.
3098  get_block_vectors(required_block,v,s);
3099  }
3100 
3101  //============================================================================
3102  /// \short Takes the naturally ordered vector and
3103  /// rearranges it into a vector of sub vectors corresponding to the blocks,
3104  /// so s[b][i] contains the i-th entry in the vector associated with block b.
3105  /// The block_vec_number indicates which blocks we want.
3106  /// These blocks and vectors are those corresponding to the internal blocks.
3107  /// Note: If the preconditioner is a subsidiary preconditioner then only the
3108  /// sub-vectors associated with the blocks of the subsidiary preconditioner
3109  /// will be included. Hence the length of v is master_nrow() whereas the
3110  /// total length of the s vectors is the sum of the Nrow of the sub vectors.
3111  //============================================================================
3112  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3114  const Vector<unsigned>& block_vec_number, const DoubleVector& v,
3115  Vector<DoubleVector >& s) const
3116  {
3117 #ifdef PARANOID
3118  if (!v.built())
3119  {
3120  std::ostringstream error_message;
3121  error_message << "The distribution of the global vector v must be setup.";
3122  throw OomphLibError(error_message.str(),
3123  OOMPH_CURRENT_FUNCTION,
3124  OOMPH_EXCEPTION_LOCATION);
3125  }
3126  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
3127  {
3128  std::ostringstream error_message;
3129  error_message << "The distribution of the global vector v must match the "
3130  << " specified master_distribution_pt(). \n"
3131  << "i.e. Distribution_pt in the master preconditioner";
3132  throw OomphLibError(error_message.str(),
3133  OOMPH_CURRENT_FUNCTION,
3134  OOMPH_EXCEPTION_LOCATION);
3135  }
3136 #endif
3137 
3138  // Number of block types
3139  //const unsigned nblock = this->internal_nblock_types();
3140  const unsigned nblock = block_vec_number.size();
3141 
3142  // if + only one processor
3143  // + more than one processor but matrix_pt is not distributed
3144  // then use the serial get_block method
3145  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
3146  !this->distribution_pt()->distributed())
3147  {
3148 
3149  // Vector of vectors for each section of residual vector
3150  s.resize(nblock);
3151 
3152  // pointer to the data in v
3153  const double* v_pt = v.values_pt();
3154 
3155  // setup the block vector and then insert the data
3156  for (unsigned b = 0; b < nblock; b++)
3157  {
3158  const unsigned required_block = block_vec_number[b];
3159  s[b].build(Internal_block_distribution_pt[required_block],0.0);
3160  double* s_pt = s[b].values_pt();
3161  unsigned nrow = s[b].nrow();
3162  for (unsigned i = 0; i < nrow; i++)
3163  {
3164  s_pt[i] = v_pt[this->Global_index[required_block][i]];
3165  }
3166  }
3167  }
3168  // otherwise use mpi
3169  else
3170  {
3171 #ifdef OOMPH_HAS_MPI
3172  // my rank
3173  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
3174 
3175  // the number of processors
3176  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
3177 
3178  // build the vectors
3179  s.resize(nblock);
3180  for (unsigned b = 0; b < nblock; b++)
3181  {
3182  const unsigned required_block = block_vec_number[b];
3183  s[b].build(Internal_block_distribution_pt[required_block],0.0);
3184  }
3185 
3186  // determine the maximum number of rows to be sent or recv
3187  // and determine the number of blocks each processor will send and recv
3188  // communication for
3189  Vector<int> nblock_send(nproc,0);
3190  Vector<int> nblock_recv(nproc,0);
3191  unsigned max_n_send_or_recv = 0;
3192  for (unsigned p = 0; p < nproc; p++)
3193  {
3194  for (unsigned b = 0; b < nblock; b++)
3195  {
3196  const unsigned required_block = block_vec_number[b];
3197  max_n_send_or_recv =
3198  std::max(max_n_send_or_recv,Nrows_to_send_for_get_block(required_block,p));
3199  max_n_send_or_recv =
3200  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_block(required_block,p));
3201  if (Nrows_to_send_for_get_block(required_block,p) > 0)
3202  {
3203  nblock_send[p]++;
3204  }
3205  if (Nrows_to_recv_for_get_block(required_block,p) > 0)
3206  {
3207  nblock_recv[p]++;
3208  }
3209  }
3210  }
3211 
3212  // create a vectors of 1s the size of the nblock for the mpi indexed
3213  // data types
3214  int* block_lengths = new int[max_n_send_or_recv];
3215  for (unsigned i = 0; i < max_n_send_or_recv; i++)
3216  {
3217  block_lengths[i] = 1;
3218  }
3219 
3220  // perform the sends and receives
3221  Vector<MPI_Request> requests;
3222  for (unsigned p = 0; p < nproc; p++)
3223  {
3224  // send and recv with other processors
3225  if (p != my_rank)
3226  {
3227  // send
3228  if (nblock_send[p] > 0)
3229  {
3230  // create the datatypes vector
3231  MPI_Datatype block_send_types[nblock_send[p]];
3232 
3233  // create the datatypes
3234  unsigned ptr = 0;
3235  for (unsigned b = 0; b < nblock; b++)
3236  {
3237  const unsigned required_block = block_vec_number[b];
3238 
3239  if (Nrows_to_send_for_get_block(required_block,p) > 0)
3240  {
3241  MPI_Type_indexed(Nrows_to_send_for_get_block(required_block,p),block_lengths,
3242  Rows_to_send_for_get_block(required_block,p),MPI_DOUBLE,
3243  &block_send_types[ptr]);
3244  MPI_Type_commit(&block_send_types[ptr]);
3245  ptr++;
3246  }
3247  }
3248 
3249  // compute the displacements and lengths
3250  MPI_Aint displacements[nblock_send[p]];
3251  int lengths[nblock_send[p]];
3252  for (int i = 0; i < nblock_send[p]; i++)
3253  {
3254  lengths[i] = 1;
3255  displacements[i] = 0;
3256  }
3257 
3258  // build the final datatype
3259  MPI_Datatype type_send;
3260  MPI_Type_struct(nblock_send[p],lengths,displacements,
3261  block_send_types,&type_send);
3262  MPI_Type_commit(&type_send);
3263 
3264  // send
3265  MPI_Request send_req;
3266  MPI_Isend(const_cast<double*>(v.values_pt()),1,type_send,p,0,
3267  this->distribution_pt()->communicator_pt()->mpi_comm(),
3268  &send_req);
3269  MPI_Type_free(&type_send);
3270  for (int i = 0; i < nblock_send[p]; i++)
3271  {
3272  MPI_Type_free(&block_send_types[i]);
3273  }
3274  requests.push_back(send_req);
3275  }
3276 
3277  // recv
3278  if (nblock_recv[p] > 0)
3279  {
3280  // create the datatypes vector
3281  MPI_Datatype block_recv_types[nblock_recv[p]];
3282 
3283  // and the displacements
3284  MPI_Aint displacements[nblock_recv[p]];
3285 
3286  // and the lengths
3287  int lengths[nblock_recv[p]];
3288 
3289  // all displacements are computed relative to s[0] values
3290  MPI_Aint displacements_base;
3291  MPI_Address(s[0].values_pt(),&displacements_base);
3292 
3293  // now build
3294  unsigned ptr = 0;
3295  for (unsigned b = 0; b < nblock; b++)
3296  {
3297  const unsigned required_block = block_vec_number[b];
3298 
3299  if (Nrows_to_recv_for_get_block(required_block,p) > 0)
3300  {
3301  MPI_Type_indexed(Nrows_to_recv_for_get_block(required_block,p),block_lengths,
3302  Rows_to_recv_for_get_block(required_block,p),MPI_DOUBLE,
3303  &block_recv_types[ptr]);
3304  MPI_Type_commit(&block_recv_types[ptr]);
3305  MPI_Address(s[b].values_pt(),&displacements[ptr]);
3306  displacements[ptr] -= displacements_base;
3307  lengths[ptr] = 1;
3308  ptr++;
3309  }
3310  }
3311 
3312  // build the final data type
3313  MPI_Datatype type_recv;
3314  MPI_Type_struct(nblock_recv[p],lengths,displacements,
3315  block_recv_types,&type_recv);
3316  MPI_Type_commit(&type_recv);
3317 
3318  // recv
3319  MPI_Request recv_req;
3320  MPI_Irecv(s[0].values_pt(),1,type_recv,p,0,
3321  this->distribution_pt()->communicator_pt()->mpi_comm(),
3322  &recv_req);
3323  MPI_Type_free(&type_recv);
3324  for (int i = 0; i < nblock_recv[p]; i++)
3325  {
3326  MPI_Type_free(&block_recv_types[i]);
3327  }
3328  requests.push_back(recv_req);
3329  }
3330  }
3331 
3332  // communicate with self
3333  else
3334  {
3335  const double* v_values_pt = v.values_pt();
3336  for (unsigned b = 0; b < nblock; b++)
3337  {
3338  const unsigned required_block = block_vec_number[b];
3339 
3340  double* w_values_pt = s[b].values_pt();
3341  for (unsigned i = 0; i < Nrows_to_send_for_get_block(required_block,p); i++)
3342  {
3343  w_values_pt[Rows_to_recv_for_get_block(required_block,p)[i]] =
3344  v_values_pt[Rows_to_send_for_get_block(required_block,p)[i]];
3345  }
3346  }
3347  }
3348  }
3349 
3350  // and then just wait
3351  unsigned c = requests.size();
3352  Vector<MPI_Status> stat(c);
3353  if (c)
3354  {
3355  MPI_Waitall(c,&requests[0],&stat[0]);
3356  }
3357  delete[] block_lengths;
3358 
3359 #else
3360  // throw error
3361  std::ostringstream error_message;
3362  error_message << "The preconditioner is distributed and on more than one "
3363  << "processor. MPI is required.";
3364  throw OomphLibError(error_message.str(),
3365  OOMPH_CURRENT_FUNCTION,
3366  OOMPH_EXCEPTION_LOCATION);
3367 #endif
3368  }
3369  }
3370 
3371  //============================================================================
3372  /// \short A helper function, takes the naturally ordered vector and
3373  /// rearranges it into a vector of sub vectors corresponding to the blocks,
3374  /// so s[b][i] contains the i-th entry in the vector associated with block b.
3375  /// The block_vec_number indicates which blocks we want.
3376  /// These blocks and vectors are those corresponding to the internal blocks.
3377  /// Note: If the preconditioner is a subsidiary preconditioner then only the
3378  /// sub-vectors associated with the blocks of the subsidiary preconditioner
3379  /// will be included. Hence the length of v is master_nrow() whereas the
3380  /// total length of the s vectors is the sum of the Nrow of the sub vectors.
3381  /// This is simply a wrapper around the other internal_get_block_vectors(...)
3382  /// function with the identity block_vec_number vector.
3383  //============================================================================
3384  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3386  const DoubleVector& v, Vector<DoubleVector >& s) const
3387  {
3388  // Number of block types
3389  const unsigned nblock = this->internal_nblock_types();
3390  Vector<unsigned> block_vec_number(nblock,0);
3391  for (unsigned b = 0; b < nblock; b++)
3392  {
3393  block_vec_number[b] = b;
3394  }
3395 
3396  internal_get_block_vectors(block_vec_number,v,s);
3397  }
3398 
3399  //============================================================================
3400  /// \short Takes the vector of block vectors, s, and copies its entries into
3401  /// the naturally ordered vector, v. If this is a subsidiary block
3402  /// preconditioner only those entries in v that are associated with its
3403  /// blocks are affected. The block_vec_number indicates which block the
3404  /// vectors in s came from. The block number corresponds to the block
3405  /// numbers in this preconditioner.
3406  //============================================================================
3407  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3408  return_block_vectors(const Vector<unsigned> & block_vec_number,
3409  const Vector<DoubleVector >& s, DoubleVector& v) const
3410  {
3411 #ifdef PARANOID
3412 
3413  // Check if v is built.
3414  if (!v.built())
3415  {
3416  std::ostringstream err_msg;
3417  err_msg << "The distribution of the global vector v must be setup.";
3418  throw OomphLibError(err_msg.str(),
3419  OOMPH_CURRENT_FUNCTION,
3420  OOMPH_EXCEPTION_LOCATION);
3421  }
3422 
3423  // v must have the same distribution as the upper-most master block
3424  // preconditioner, since the upper-most master block preconditioner
3425  // should have the same distribution as the matrix pointed to
3426  // by matrix_pt().
3427  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
3428  {
3429  std::ostringstream err_msg;
3430  err_msg << "The distribution of the global vector v must match the "
3431  << " specified master_distribution_pt(). \n"
3432  << "i.e. Distribution_pt in the master preconditioner";
3433  throw OomphLibError(err_msg.str(),
3434  OOMPH_CURRENT_FUNCTION,
3435  OOMPH_EXCEPTION_LOCATION);
3436  }
3437 
3438  // Check if the number of vectors in s is the same as the number of block
3439  // numbers described in block_vec_number.
3440  const unsigned para_block_vec_number_size = block_vec_number.size();
3441  const unsigned para_s_size = s.size();
3442  if(para_block_vec_number_size != para_s_size)
3443  {
3444  std::ostringstream err_msg;
3445  err_msg << "block_vec_number.size() is " << para_block_vec_number_size
3446  << "\n."
3447  << "s.size() is " << para_s_size << ".\n"
3448  << "But they must be the same size!\n";
3449  throw OomphLibError(err_msg.str(),
3450  OOMPH_CURRENT_FUNCTION,
3451  OOMPH_EXCEPTION_LOCATION);
3452  }
3453 
3454  // Check to see if there are more blocks defined in the block_vec_number
3455  // vector than the number of block types. This is not allowed.
3456  const unsigned para_n_block = nblock_types();
3457  if(para_block_vec_number_size > para_n_block)
3458  {
3459  std::ostringstream err_msg;
3460  err_msg << "Trying to return " << para_block_vec_number_size
3461  << " block vectors.\n"
3462  << "But there are only " << para_n_block << " block types.\n";
3463  throw OomphLibError(err_msg.str(),
3464  OOMPH_CURRENT_FUNCTION,
3465  OOMPH_EXCEPTION_LOCATION);
3466  }
3467 
3468  // Check if any block numbers defined in block_vec_number is equal to or
3469  // greater than the number of block types.
3470  // E.g. if there are 5 block types, we can only have block numbers:
3471  // 0, 1, 2, 3 and 4.
3472  for (unsigned b = 0; b < para_block_vec_number_size; b++)
3473  {
3474  const unsigned para_required_block = block_vec_number[b];
3475  if(para_required_block > para_n_block)
3476  {
3477  std::ostringstream err_msg;
3478  err_msg << "block_vec_number[" << b << "] is " << para_required_block
3479  << ".\n"
3480  << "But there are only " << para_n_block << " block types.\n";
3481  throw OomphLibError(err_msg.str(),
3482  OOMPH_CURRENT_FUNCTION,
3483  OOMPH_EXCEPTION_LOCATION);
3484 
3485  }
3486  }
3487 
3488  // Check that no block number is inserted twice.
3489  std::set<unsigned> para_set;
3490  for (unsigned b = 0; b < para_block_vec_number_size; b++)
3491  {
3492  std::pair<std::set<unsigned>::iterator,bool> para_set_ret;
3493  para_set_ret = para_set.insert(block_vec_number[b]);
3494 
3495  if(!para_set_ret.second)
3496  {
3497  std::ostringstream err_msg;
3498  err_msg << "Error: the block number "
3499  << block_vec_number[b]
3500  << " appears twice.\n";
3501  throw OomphLibError(err_msg.str(),
3502  OOMPH_CURRENT_FUNCTION,
3503  OOMPH_EXCEPTION_LOCATION);
3504  }
3505  }
3506 
3507  // Check to see that all the vectors in s are built
3508  // (since we are trying to return them).
3509  for (unsigned b = 0; b < para_block_vec_number_size; b++)
3510  {
3511  if(!s[b].built())
3512  {
3513  std::ostringstream err_msg;
3514  err_msg << "The distribution of the block vector s["
3515  << b << "] must be setup.\n";
3516  throw OomphLibError(err_msg.str(),
3517  OOMPH_CURRENT_FUNCTION,
3518  OOMPH_EXCEPTION_LOCATION);
3519  }
3520  }
3521 
3522  // Since these are built, we check that the distributions are correct.
3523  // This are incorrect if the block numbers in block_vec_number and
3524  // the vectors in s does not match.
3525  for (unsigned b = 0; b < para_block_vec_number_size; b++)
3526  {
3527  if (*(s[b].distribution_pt()) !=
3528  *(Block_distribution_pt[block_vec_number[b]]))
3529  {
3530  std::ostringstream error_message;
3531  error_message << "The distribution of the block vector " << b
3532  << " must match the"
3533  << " specified distribution at "
3534  << "Block_distribution_pt["
3535  << block_vec_number[b] << "].\n"
3536  << "The distribution of the Block_distribution_pt is determined by\n"
3537  << "the vector block_vec_number. Perhaps it is incorrect?\n";
3538  throw OomphLibError(error_message.str(),
3539  OOMPH_CURRENT_FUNCTION,
3540  OOMPH_EXCEPTION_LOCATION);
3541  }
3542  }
3543 #endif
3544 
3545  // Number of blocks to get.
3546  const unsigned n_block = block_vec_number.size();
3547 
3548  // Each block is made of dof types. We get the most fine grain dof types.
3549  // Most fine grain in the sense that these are the dof types that belongs
3550  // in this block before any coarsening of dof types has taken place.
3551  // The ordering of the dof types matters, this is handled properly when
3552  // creating the Block_to_dof_map_fine vector and must be respected here.
3553  // I.e. we cannot arbitrarily insert dof types (even if they are correct)
3554  // in the vector most_fine_grain_dof.
3555  Vector<unsigned> most_fine_grain_dof;
3556  for (unsigned b = 0; b < n_block; b++)
3557  {
3558  const unsigned mapped_b = block_vec_number[b];
3559 
3560  most_fine_grain_dof.insert(most_fine_grain_dof.end(),
3561  Block_to_dof_map_fine[mapped_b].begin(),
3562  Block_to_dof_map_fine[mapped_b].end());
3563  }
3564 
3565  // Split all the blocks into it's most fine grain dof vector.
3566  Vector<DoubleVector> dof_vector(most_fine_grain_dof.size());
3567 
3568  unsigned offset = 0;
3569 
3570  // Perform the splitting for each block.
3571  for (unsigned b = 0; b < n_block; b++)
3572  {
3573  // The actual block number.
3574  const unsigned mapped_b = block_vec_number[b];
3575 
3576  // How many most fine grain dof types are associated with this block?
3577  const unsigned ndof = Block_to_dof_map_fine[mapped_b].size();
3578 
3579  if(ndof == 1)
3580  // No need to split, just copy.
3581  {
3582  dof_vector[offset] = s[b];
3583  }
3584  else
3585  // Need to split s[b] into it's most fine grain dof vectors
3586  {
3587  // To store pointers to the dof vectors associated with this block.
3588  Vector<DoubleVector*> tmp_dof_vector_pt(ndof,0);
3589 
3590  for (unsigned d = 0; d < ndof; d++)
3591  {
3592  const unsigned offset_plus_d = offset + d;
3593 
3594  // build the dof vector.
3595  dof_vector[offset_plus_d].build(
3596  Internal_block_distribution_pt[
3597  most_fine_grain_dof[offset_plus_d]]);
3598 
3599  // Store the pointer.
3600  tmp_dof_vector_pt[d] = &dof_vector[offset_plus_d];
3601  }
3602 
3603  // Split without communication.
3605  s[b],tmp_dof_vector_pt);
3606  }
3607 
3608  // Update the offset!
3609  offset += ndof;
3610  }
3611 
3612  // Return the block vectors all in one go.
3613  internal_return_block_vectors(most_fine_grain_dof,
3614  dof_vector,
3615  v);
3616  } // return_block_vectors(...)
3617 
3618 
3619  //============================================================================
3620  /// \short Takes the vector of block vectors, s, and copies its entries into
3621  /// the naturally ordered vector, v. If this is a subsidiary block
3622  /// preconditioner only those entries in v that are associated with its
3623  /// blocks are affected. The block_vec_number indicates which block the
3624  /// vectors in s came from. The block number corresponds to the block
3625  /// numbers in this preconditioner.
3626  /// This is simply a wrapper around the other return_block_vectors(...)
3627  /// function where the block_vec_number Vector is the identity, i.e.
3628  /// block_vec_number is [0, 1, ..., nblock_types - 1].
3629  //============================================================================
3630  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3632  {
3633  // The number of block types in this preconditioner.
3634  const unsigned n_block = nblock_types();
3635 
3636  // Create the identity vector.
3637  Vector<unsigned>required_block(n_block,0);
3638  for (unsigned i = 0; i < n_block; i++)
3639  {
3640  required_block[i] = i;
3641  }
3642 
3643  // Call the other return_block_vectors function which does the work.
3644  return_block_vectors(required_block,s,v);
3645  } // return_block_vectors(...)
3646 
3647  //============================================================================
3648  /// \short Takes the naturally ordered vector and
3649  /// rearranges it into a vector of sub vectors corresponding to the blocks,
3650  /// so s[b][i] contains the i-th entry in the vector associated with block b.
3651  /// The block_vec_number indicates which blocks we want.
3652  /// These blocks and vectors are those corresponding to the internal blocks.
3653  /// Note: If the preconditioner is a subsidiary preconditioner then only the
3654  /// sub-vectors associated with the blocks of the subsidiary preconditioner
3655  /// will be included. Hence the length of v is master_nrow() whereas the
3656  /// total length of the s vectors is the sum of the Nrow of the sub vectors.
3657  //============================================================================
3658  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3660  const Vector<unsigned>& block_vec_number,
3661  const Vector<DoubleVector >& s, DoubleVector& v) const
3662  {
3663  // the number of blocks
3664  const unsigned nblock = block_vec_number.size();
3665 
3666 #ifdef PARANOID
3667  if (!v.built())
3668  {
3669  std::ostringstream error_message;
3670  error_message << "The distribution of the global vector v must be setup.";
3671  throw OomphLibError(error_message.str(),
3672  OOMPH_CURRENT_FUNCTION,
3673  OOMPH_EXCEPTION_LOCATION);
3674  }
3675  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
3676  {
3677  std::ostringstream error_message;
3678  error_message << "The distribution of the global vector v must match the "
3679  << " specified master_distribution_pt(). \n"
3680  << "i.e. Distribution_pt in the master preconditioner";
3681  throw OomphLibError(error_message.str(),
3682  OOMPH_CURRENT_FUNCTION,
3683  OOMPH_EXCEPTION_LOCATION);
3684  }
3685  for (unsigned b = 0; b < nblock; b++)
3686  {
3687  if (!s[b].built())
3688  {
3689  std::ostringstream error_message;
3690  error_message << "The distribution of the block vector " << b
3691  << " must be setup.";
3692  throw OomphLibError(error_message.str(),
3693  OOMPH_CURRENT_FUNCTION,
3694  OOMPH_EXCEPTION_LOCATION);
3695  }
3696  const unsigned required_block = block_vec_number[b];
3697  if (*(s[b].distribution_pt()) != *(Internal_block_distribution_pt[required_block]))
3698  {
3699  std::ostringstream error_message;
3700  error_message << "The distribution of the block vector " << b
3701  << " must match the"
3702  << " specified distribution at Internal_block_distribution_pt["
3703  << b << "]";
3704  throw OomphLibError(error_message.str(),
3705  OOMPH_CURRENT_FUNCTION,
3706  OOMPH_EXCEPTION_LOCATION);
3707  }
3708  }
3709 #endif
3710 
3711  // if + only one processor
3712  // + more than one processor but matrix_pt is not distributed
3713  // then use the serial get_block method
3714  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
3715  !this->distribution_pt()->distributed())
3716  {
3717  double* v_pt = v.values_pt();
3718  for (unsigned b = 0; b < nblock; b++)
3719  {
3720  const unsigned required_block = block_vec_number[b];
3721 
3722  const double* s_pt = s[b].values_pt();
3723  unsigned nrow = this->internal_block_dimension(required_block);
3724  for (unsigned i = 0; i < nrow; i++)
3725  {
3726  v_pt[this->Global_index[required_block][i]] = s_pt[i];
3727  }
3728  }
3729  }
3730  // otherwise use mpi
3731  else
3732  {
3733 #ifdef OOMPH_HAS_MPI
3734 
3735  // my rank
3736  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
3737 
3738  // the number of processors
3739  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
3740 
3741  // determine the maximum number of rows to be sent or recv
3742  // and determine the number of blocks each processor will send and recv
3743  // communication for
3744  Vector<int> nblock_send(nproc,0);
3745  Vector<int> nblock_recv(nproc,0);
3746  unsigned max_n_send_or_recv = 0;
3747  for (unsigned p = 0; p < nproc; p++)
3748  {
3749  for (unsigned b = 0; b < nblock; b++)
3750  {
3751  const unsigned required_block = block_vec_number[b];
3752 
3753  max_n_send_or_recv =
3754  std::max(max_n_send_or_recv,Nrows_to_send_for_get_block(required_block,p));
3755  max_n_send_or_recv =
3756  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_block(required_block,p));
3757  if (Nrows_to_send_for_get_block(required_block,p) > 0)
3758  {
3759  nblock_recv[p]++;
3760  }
3761  if (Nrows_to_recv_for_get_block(required_block,p) > 0)
3762  {
3763  nblock_send[p]++;
3764  }
3765  }
3766  }
3767 
3768  // create a vectors of 1s the size of the nblock for the mpi indexed
3769  // data types
3770  int* block_lengths = new int[max_n_send_or_recv];
3771  for (unsigned i = 0; i < max_n_send_or_recv; i++)
3772  {
3773  block_lengths[i] = 1;
3774  }
3775 
3776  // perform the sends and receives
3777  Vector<MPI_Request> requests;
3778  for (unsigned p = 0; p < nproc; p++)
3779  {
3780  // send and recv with other processors
3781  if (p != my_rank)
3782  {
3783  // recv
3784  if (nblock_recv[p] > 0)
3785  {
3786  // create the datatypes vector
3787  MPI_Datatype block_recv_types[nblock_recv[p]];
3788 
3789  // create the datatypes
3790  unsigned ptr = 0;
3791  for (unsigned b = 0; b < nblock; b++)
3792  {
3793  const unsigned required_block = block_vec_number[b];
3794 
3795  if (Nrows_to_send_for_get_block(required_block,p) > 0)
3796  {
3797  MPI_Type_indexed(Nrows_to_send_for_get_block(required_block,p),block_lengths,
3798  Rows_to_send_for_get_block(required_block,p),MPI_DOUBLE,
3799  &block_recv_types[ptr]);
3800  MPI_Type_commit(&block_recv_types[ptr]);
3801  ptr++;
3802  }
3803  }
3804 
3805  // compute the displacements and lengths
3806  MPI_Aint displacements[nblock_recv[p]];
3807  int lengths[nblock_recv[p]];
3808  for (int i = 0; i < nblock_recv[p]; i++)
3809  {
3810  lengths[i] = 1;
3811  displacements[i] = 0;
3812  }
3813 
3814  // build the final datatype
3815  MPI_Datatype type_recv;
3816  MPI_Type_struct(nblock_recv[p],lengths,displacements,
3817  block_recv_types,&type_recv);
3818  MPI_Type_commit(&type_recv);
3819 
3820  // recv
3821  MPI_Request recv_req;
3822  MPI_Irecv(v.values_pt(),1,type_recv,p,0,
3823  this->distribution_pt()->communicator_pt()->mpi_comm(),
3824  &recv_req);
3825  MPI_Type_free(&type_recv);
3826  for (int i = 0; i < nblock_recv[p]; i++)
3827  {
3828  MPI_Type_free(&block_recv_types[i]);
3829  }
3830  requests.push_back(recv_req);
3831  }
3832 
3833  // send
3834  if (nblock_send[p] > 0)
3835  {
3836  // create the datatypes vector
3837  MPI_Datatype block_send_types[nblock_send[p]];
3838 
3839  // and the displacements
3840  MPI_Aint displacements[nblock_send[p]];
3841 
3842  // and the lengths
3843  int lengths[nblock_send[p]];
3844 
3845  // all displacements are computed relative to s[0] values
3846  MPI_Aint displacements_base;
3847  MPI_Address(const_cast<double*>(s[0].values_pt()),
3848  &displacements_base);
3849 
3850  // now build
3851  unsigned ptr = 0;
3852  for (unsigned b = 0; b < nblock; b++)
3853  {
3854  const unsigned required_block = block_vec_number[b];
3855 
3856  if (Nrows_to_recv_for_get_block(required_block,p) > 0)
3857  {
3858  MPI_Type_indexed(Nrows_to_recv_for_get_block(required_block,p),block_lengths,
3859  Rows_to_recv_for_get_block(required_block,p),MPI_DOUBLE,
3860  &block_send_types[ptr]);
3861  MPI_Type_commit(&block_send_types[ptr]);
3862  MPI_Address(const_cast<double*>(s[b].values_pt()),
3863  &displacements[ptr]);
3864  displacements[ptr] -= displacements_base;
3865  lengths[ptr] = 1;
3866  ptr++;
3867  }
3868  }
3869 
3870  // build the final data type
3871  MPI_Datatype type_send;
3872  MPI_Type_struct(nblock_send[p],lengths,displacements,
3873  block_send_types,&type_send);
3874  MPI_Type_commit(&type_send);
3875 
3876  // send
3877  MPI_Request send_req;
3878  MPI_Isend(const_cast<double*>(s[0].values_pt()),1,type_send,p,0,
3879  this->distribution_pt()->communicator_pt()->mpi_comm(),
3880  &send_req);
3881  MPI_Type_free(&type_send);
3882  for (int i = 0; i < nblock_send[p]; i++)
3883  {
3884  MPI_Type_free(&block_send_types[i]);
3885  }
3886  requests.push_back(send_req);
3887  }
3888  }
3889 
3890  // communicate wih self
3891  else
3892  {
3893  double* v_values_pt = v.values_pt();
3894  for (unsigned b = 0; b < nblock; b++)
3895  {
3896  const unsigned required_block = block_vec_number[b];
3897 
3898  const double* w_values_pt = s[b].values_pt();
3899  for (unsigned i = 0; i < Nrows_to_send_for_get_block(required_block,p); i++)
3900  {
3901  v_values_pt[Rows_to_send_for_get_block(required_block,p)[i]] =
3902  w_values_pt[Rows_to_recv_for_get_block(required_block,p)[i]];
3903 
3904  }
3905  }
3906  }
3907  }
3908 
3909  // and then just wait
3910  unsigned c = requests.size();
3911  Vector<MPI_Status> stat(c);
3912  if (c)
3913  {
3914  MPI_Waitall(c,&requests[0],&stat[0]);
3915  }
3916  delete[] block_lengths;
3917 
3918 #else
3919  // throw error
3920  std::ostringstream error_message;
3921  error_message << "The preconditioner is distributed and on more than one "
3922  << "processor. MPI is required.";
3923  throw OomphLibError(error_message.str(),
3924  OOMPH_CURRENT_FUNCTION,
3925  OOMPH_EXCEPTION_LOCATION);
3926 #endif
3927  }
3928  }
3929 
3930  //============================================================================
3931  /// \short A helper function, takes the naturally ordered vector and
3932  /// rearranges it into a vector of sub vectors corresponding to the blocks,
3933  /// so s[b][i] contains the i-th entry in the vector associated with block b.
3934  /// The block_vec_number indicates which blocks we want.
3935  /// These blocks and vectors are those corresponding to the internal blocks.
3936  /// Note: If the preconditioner is a subsidiary preconditioner then only the
3937  /// sub-vectors associated with the blocks of the subsidiary preconditioner
3938  /// will be included. Hence the length of v is master_nrow() whereas the
3939  /// total length of the s vectors is the sum of the Nrow of the sub vectors.
3940  /// This is simply a wrapper around the other internal_get_block_vectors(...)
3941  /// function with the identity block_vec_number vector.
3942  //============================================================================
3943  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3945  const Vector<DoubleVector >& s, DoubleVector& v) const
3946  {
3947  // the number of blocks
3948  const unsigned nblock = this->internal_nblock_types();
3949  Vector<unsigned> block_vec_number(nblock,0);
3950  for (unsigned b = 0; b < nblock; b++)
3951  {
3952  block_vec_number[b] = b;
3953  }
3954 
3955  internal_return_block_vectors(block_vec_number,s,v);
3956  }
3957 
3958  //============================================================================
3959  /// \short A helper function, takes the naturally ordered vector, v,
3960  /// and extracts the n-th block vector, b.
3961  /// Here n is the block number in the current preconditioner.
3962  /// NOTE: The ordering of the vector b is the same as the
3963  /// ordering of the block matrix from internal_get_block(...).
3964  //============================================================================
3965  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
3966  internal_get_block_vector(const unsigned& b,
3967  const DoubleVector& v,
3968  DoubleVector& w)
3969  const
3970  {
3971 #ifdef PARANOID
3972  // the number of blocks
3973  const unsigned n_blocks = this->internal_nblock_types();
3974 
3975  // paranoid check that block i is in this block preconditioner
3976  if (b >= n_blocks)
3977  {
3978  std::ostringstream error_message;
3979  error_message << "Requested block vector " << b
3980  << ", however this preconditioner has internal_nblock_types() "
3981  << "= " << internal_nblock_types() << std::endl;
3982  throw OomphLibError(error_message.str(),
3983  OOMPH_CURRENT_FUNCTION,
3984  OOMPH_EXCEPTION_LOCATION);
3985  }
3986  if (!v.built())
3987  {
3988  std::ostringstream error_message;
3989  error_message << "The distribution of the global vector v must be setup.";
3990  throw OomphLibError(error_message.str(),
3991  OOMPH_CURRENT_FUNCTION,
3992  OOMPH_EXCEPTION_LOCATION);
3993  }
3994  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
3995  {
3996  std::ostringstream error_message;
3997  error_message << "The distribution of the global vector v must match the "
3998  << " specified master_distribution_pt(). \n"
3999  << "i.e. Distribution_pt in the master preconditioner";
4000  throw OomphLibError(error_message.str(),
4001  OOMPH_CURRENT_FUNCTION,
4002  OOMPH_EXCEPTION_LOCATION);
4003  }
4004 #endif
4005 
4006  // rebuild the block vector
4007  w.build(Internal_block_distribution_pt[b],0.0);
4008 
4009  // if + only one processor
4010  // + more than one processor but matrix_pt is not distributed
4011  // then use the serial get_block method
4012  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
4013  !this->distribution_pt()->distributed())
4014  {
4015  double* w_pt = w.values_pt();
4016  const double* v_pt = v.values_pt();
4017  unsigned n_row = w.nrow();
4018  for (unsigned i = 0; i < n_row; i++)
4019  {
4020  w_pt[i] = v_pt[this->Global_index[b][i]];
4021  }
4022  }
4023  // otherwise use mpi
4024  else
4025  {
4026 #ifdef OOMPH_HAS_MPI
4027 
4028  // my rank
4029  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
4030 
4031  // the number of processors
4032  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
4033 
4034  // determine the maximum number of rows to be sent or recv
4035  unsigned max_n_send_or_recv = 0;
4036  for (unsigned p = 0; p < nproc; p++)
4037  {
4038  max_n_send_or_recv =
4039  std::max(max_n_send_or_recv,Nrows_to_send_for_get_block(b,p));
4040  max_n_send_or_recv =
4041  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_block(b,p));
4042  }
4043 
4044  // create a vectors of 1s (the size of the nblock for the mpi indexed
4045  // data types
4046  int* block_lengths = new int[max_n_send_or_recv];
4047  for (unsigned i = 0; i < max_n_send_or_recv; i++)
4048  {
4049  block_lengths[i] = 1;
4050  }
4051 
4052  // perform the sends and receives
4053  Vector<MPI_Request> requests;
4054  for (unsigned p = 0; p < nproc; p++)
4055  {
4056  // send and recv with other processors
4057  if (p != my_rank)
4058  {
4059  if (Nrows_to_send_for_get_block(b,p) > 0)
4060  {
4061  // create the send datatype
4062  MPI_Datatype type_send;
4063  MPI_Type_indexed(Nrows_to_send_for_get_block(b,p),block_lengths,
4064  Rows_to_send_for_get_block(b,p),MPI_DOUBLE,
4065  &type_send);
4066  MPI_Type_commit(&type_send);
4067 
4068  // send
4069  MPI_Request send_req;
4070  MPI_Isend(const_cast<double*>(v.values_pt()),1,type_send,p,0,
4071  this->distribution_pt()->communicator_pt()->mpi_comm(),
4072  &send_req);
4073  MPI_Type_free(&type_send);
4074  requests.push_back(send_req);
4075  }
4076 
4077  if (Nrows_to_recv_for_get_block(b,p) > 0)
4078  {
4079  // create the recv datatype
4080  MPI_Datatype type_recv;
4081  MPI_Type_indexed(Nrows_to_recv_for_get_block(b,p),block_lengths,
4082  Rows_to_recv_for_get_block(b,p),MPI_DOUBLE,
4083  &type_recv);
4084  MPI_Type_commit(&type_recv);
4085 
4086  // recv
4087  MPI_Request recv_req;
4088  MPI_Irecv(w.values_pt(),1,type_recv,p,0,
4089  this->distribution_pt()->communicator_pt()->mpi_comm(),
4090  &recv_req);
4091  MPI_Type_free(&type_recv);
4092  requests.push_back(recv_req);
4093  }
4094  }
4095 
4096  // communicate with self
4097  else
4098  {
4099  double* w_values_pt = w.values_pt();
4100  const double* v_values_pt = v.values_pt();
4101  for (unsigned i = 0; i < Nrows_to_send_for_get_block(b,p); i++)
4102  {
4103  w_values_pt[Rows_to_recv_for_get_block(b,p)[i]] =
4104  v_values_pt[Rows_to_send_for_get_block(b,p)[i]];
4105  }
4106  }
4107  }
4108 
4109  // and then just wait
4110  unsigned c = requests.size();
4111  Vector<MPI_Status> stat(c);
4112  if (c)
4113  {
4114  MPI_Waitall(c,&requests[0],&stat[0]);
4115  }
4116  delete[] block_lengths;
4117 
4118 #else
4119  // throw error
4120  std::ostringstream error_message;
4121  error_message << "The preconditioner is distributed and on more than one "
4122  << "processor. MPI is required.";
4123  throw OomphLibError(error_message.str(),
4124  OOMPH_CURRENT_FUNCTION,
4125  OOMPH_EXCEPTION_LOCATION);
4126 #endif
4127  }
4128  }
4129 
4130  //============================================================================
4131  /// \short Takes the naturally ordered vector, v and returns the n-th
4132  /// block vector, b. Here n is the block number in the current
4133  /// preconditioner.
4134  //============================================================================
4135  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4136  get_block_vector(const unsigned& b, const DoubleVector& v, DoubleVector& w)
4137  const
4138  {
4139 #ifdef PARANOID
4140  // the number of blocks
4141  const unsigned para_n_blocks = nblock_types();
4142 
4143  // paranoid check that block i is in this block preconditioner
4144  if (b >= para_n_blocks)
4145  {
4146  std::ostringstream err_msg;
4147  err_msg << "Requested block vector " << b
4148  << ", however this preconditioner has only "
4149  << para_n_blocks << " block types" << ".\n";
4150  throw OomphLibError(err_msg.str(),
4151  OOMPH_CURRENT_FUNCTION,
4152  OOMPH_EXCEPTION_LOCATION);
4153  }
4154 
4155  if (!v.built())
4156  {
4157  std::ostringstream err_msg;
4158  err_msg << "The distribution of the global vector v must be setup.";
4159  throw OomphLibError(err_msg.str(),
4160  OOMPH_CURRENT_FUNCTION,
4161  OOMPH_EXCEPTION_LOCATION);
4162  }
4163  if (*(v.distribution_pt()) != *(this->master_distribution_pt()))
4164  {
4165  std::ostringstream err_msg;
4166  err_msg << "The distribution of the global vector v must match the "
4167  << " specified master_distribution_pt(). \n"
4168  << "i.e. Distribution_pt in the master preconditioner";
4169  throw OomphLibError(err_msg.str(),
4170  OOMPH_CURRENT_FUNCTION,
4171  OOMPH_EXCEPTION_LOCATION);
4172  }
4173 #endif
4174 
4175  // Recall that, the relationship between the external blocks and the external
4176  // dof types, as seen by the preconditioner writer is stored in the mapping
4177  // Block_to_dof_map_coarse.
4178  //
4179  // However, each dof type could have been coarsened! The relationship
4180  // between the dof types of this preconditioner and the parent preconditioner
4181  // is stored in the mapping Doftype_coarsen_map_coarse. The dof numbers in
4182  // this map is relative to this preconditioner.
4183  //
4184  // Finally, the relationship between the dof types of this preconditioner
4185  // and the most fine grain dof types is stored in the mapping
4186  // Doftype_coarsen_map_fine. Again, the dof numbers in this map is relative
4187  // to this preconditioner.
4188  //
4189  // Furthermore, we note that concatenation of vectors without communication
4190  // is associative, but not commutative. I.e.
4191  // (V1+V2)+V3 = V1 + (V2 + V3), where + is concatenation without
4192  // communication.
4193  //
4194  // So all we need is the vectors listed in the correct order.
4195  //
4196  // We need only Block_to_dof_map_coarse to tell us which external dof types
4197  // are in this block, then Doftype_coarsen_map_fine to tell us which most
4198  // fine grain dofs to concatenate!
4199  //
4200  // All the mapping vectors are constructed to respect the ordering of
4201  // the dof types.
4202 
4203  // Get the most fine grain block to dof mapping.
4204  Vector<unsigned> most_fine_grain_dof = Block_to_dof_map_fine[b];
4205 
4206  // How many vectors do we need to concatenate?
4207  const unsigned n_dof_vec = most_fine_grain_dof.size();
4208 
4209  if(n_dof_vec == 1)
4210  // No need to concatenate, just extract the vector.
4211  {
4212  internal_get_block_vector(most_fine_grain_dof[0],v,w);
4213  }
4214  else
4215  // Need to concatenate dof-level vectors.
4216  {
4217  Vector<DoubleVector> dof_vector(n_dof_vec);
4218 
4219  // Get all the dof-level vectors in one go
4220  internal_get_block_vectors(most_fine_grain_dof,
4221  v, dof_vector);
4222  // Build w with the correct distribution.
4223  w.build(Block_distribution_pt[b],0);
4224 
4225  // Concatenate the vectors.
4227 
4228  dof_vector.clear();
4229  }
4230  } // get_block_vector(...)
4231 
4232  //============================================================================
4233  /// \short Takes the n-th block ordered vector, b, and copies its entries
4234  /// to the appropriate entries in the naturally ordered vector, v.
4235  /// Here n is the block number in the current block preconditioner.
4236  /// If the preconditioner is a subsidiary block preconditioner
4237  /// the other entries in v that are not associated with it
4238  /// are left alone.
4239  ///
4240  /// This version works with the internal block types. This is legacy code
4241  /// but is kept alive, hence moved to private. Please use the
4242  /// function "return_block_vector(...)".
4243  //============================================================================
4244  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4246  const DoubleVector& w,
4247  DoubleVector& v)
4248  const
4249  {
4250 #ifdef PARANOID
4251  // the number of blocks
4252  const unsigned n_blocks = this->internal_nblock_types();
4253 
4254  // paranoid check that block i is in this block preconditioner
4255  if (b >= n_blocks)
4256  {
4257  std::ostringstream error_message;
4258  error_message << "Requested block vector " << b
4259  << ", however this preconditioner has internal_nblock_types() "
4260  << "= " << internal_nblock_types() << std::endl;
4261  throw OomphLibError(error_message.str(),
4262  OOMPH_CURRENT_FUNCTION,
4263  OOMPH_EXCEPTION_LOCATION);
4264  }
4265  if (!v.built())
4266  {
4267  std::ostringstream error_message;
4268  error_message << "The distribution of the global vector v must be setup.";
4269  throw OomphLibError(error_message.str(),
4270  OOMPH_CURRENT_FUNCTION,
4271  OOMPH_EXCEPTION_LOCATION);
4272  }
4273  if (*v.distribution_pt() != *this->master_distribution_pt())
4274  {
4275  std::ostringstream error_message;
4276  error_message << "The distribution of the global vector v must match the "
4277  << " specified master_distribution_pt(). \n"
4278  << "i.e. Distribution_pt in the master preconditioner";
4279  throw OomphLibError(error_message.str(),
4280  OOMPH_CURRENT_FUNCTION,
4281  OOMPH_EXCEPTION_LOCATION);
4282  }
4283  if (!w.built())
4284  {
4285  std::ostringstream error_message;
4286  error_message << "The distribution of the block vector w must be setup.";
4287  throw OomphLibError(error_message.str(),
4288  OOMPH_CURRENT_FUNCTION,
4289  OOMPH_EXCEPTION_LOCATION);
4290  }
4291  if (*w.distribution_pt() != *Internal_block_distribution_pt[b])
4292  {
4293  std::ostringstream error_message;
4294  error_message << "The distribution of the block vector w must match the "
4295  << " specified distribution at Internal_block_distribution_pt[b]";
4296  throw OomphLibError(error_message.str(),
4297  OOMPH_CURRENT_FUNCTION,
4298  OOMPH_EXCEPTION_LOCATION);
4299  }
4300 #endif
4301 
4302  // if + only one processor
4303  // + more than one processor but matrix_pt is not distributed
4304  // then use the serial get_block method
4305  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
4306  !this->distribution_pt()->distributed())
4307  {
4308 
4309  // length of vector
4310  unsigned n_row = this->internal_block_dimension(b);
4311 
4312  // copy back from the block vector to the naturally ordered vector
4313  double* v_pt = v.values_pt();
4314  const double* w_pt = w.values_pt();
4315  for (unsigned i = 0; i < n_row; i++)
4316  {
4317  v_pt[this->Global_index[b][i]] = w_pt[i];
4318  }
4319  }
4320  // otherwise use mpi
4321  else
4322  {
4323 #ifdef OOMPH_HAS_MPI
4324 
4325  // my rank
4326  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
4327 
4328  // the number of processors
4329  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
4330 
4331  // determine the maximum number of rows to be sent or recv
4332  unsigned max_n_send_or_recv = 0;
4333  for (unsigned p = 0; p < nproc; p++)
4334  {
4335  max_n_send_or_recv =
4336  std::max(max_n_send_or_recv,Nrows_to_send_for_get_block(b,p));
4337  max_n_send_or_recv =
4338  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_block(b,p));
4339  }
4340 
4341  // create a vectors of 1s (the size of the nblock for the mpi indexed
4342  // data types
4343  int* block_lengths = new int[max_n_send_or_recv];
4344  for (unsigned i = 0; i < max_n_send_or_recv; i++)
4345  {
4346  block_lengths[i] = 1;
4347  }
4348 
4349  // perform the sends and receives
4350  Vector<MPI_Request> requests;
4351  for (unsigned p = 0; p < nproc; p++)
4352  {
4353  // send and recv with other processors
4354  if (p != my_rank)
4355  {
4356  if (Nrows_to_recv_for_get_block(b,p) > 0)
4357  {
4358  // create the send datatype
4359  MPI_Datatype type_send;
4360  MPI_Type_indexed(Nrows_to_recv_for_get_block(b,p),block_lengths,
4361  Rows_to_recv_for_get_block(b,p),MPI_DOUBLE,
4362  &type_send);
4363  MPI_Type_commit(&type_send);
4364 
4365  // send
4366  MPI_Request send_req;
4367  MPI_Isend(const_cast<double*>(w.values_pt()),1,type_send,p,0,
4368  this->distribution_pt()->communicator_pt()->mpi_comm(),
4369  &send_req);
4370  MPI_Type_free(&type_send);
4371  requests.push_back(send_req);
4372  }
4373 
4374  if (Nrows_to_send_for_get_block(b,p) > 0)
4375  {
4376  // create the recv datatype
4377  MPI_Datatype type_recv;
4378  MPI_Type_indexed(Nrows_to_send_for_get_block(b,p),block_lengths,
4379  Rows_to_send_for_get_block(b,p),MPI_DOUBLE,
4380  &type_recv);
4381  MPI_Type_commit(&type_recv);
4382 
4383  // recv
4384  MPI_Request recv_req;
4385  MPI_Irecv(v.values_pt(),1,type_recv,p,0,
4386  this->distribution_pt()->communicator_pt()->mpi_comm(),
4387  &recv_req);
4388  MPI_Type_free(&type_recv);
4389  requests.push_back(recv_req);
4390  }
4391  }
4392 
4393  // communicate wih self
4394  else
4395  {
4396  const double* w_values_pt = w.values_pt();
4397  double* v_values_pt = v.values_pt();
4398  for (unsigned i = 0; i < Nrows_to_send_for_get_block(b,p); i++)
4399  {
4400  v_values_pt[Rows_to_send_for_get_block(b,p)[i]] =
4401  w_values_pt[Rows_to_recv_for_get_block(b,p)[i]];
4402  }
4403  }
4404  }
4405 
4406  // and then just wait
4407  unsigned c = requests.size();
4408  Vector<MPI_Status> stat(c);
4409  if (c)
4410  {
4411  MPI_Waitall(c,&requests[0],&stat[0]);
4412  }
4413  delete[] block_lengths;
4414 
4415 #else
4416  // throw error
4417  std::ostringstream error_message;
4418  error_message << "The preconditioner is distributed and on more than one "
4419  << "processor. MPI is required.";
4420  throw OomphLibError(error_message.str(),
4421  OOMPH_CURRENT_FUNCTION,
4422  OOMPH_EXCEPTION_LOCATION);
4423 #endif
4424  }
4425  }
4426 
4427  //============================================================================
4428  /// \short Takes the n-th block ordered vector, b, and copies its entries
4429  /// to the appropriate entries in the naturally ordered vector, v.
4430  /// Here n is the block number in the current block preconditioner.
4431  /// If the preconditioner is a subsidiary block preconditioner
4432  /// the other entries in v that are not associated with it
4433  /// are left alone.
4434  //============================================================================
4435  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4436  return_block_vector(const unsigned& n, const DoubleVector& b, DoubleVector& v)
4437  const
4438  {
4439 #ifdef PARANOID
4440  // the number of blocks
4441  const unsigned para_n_blocks = nblock_types();
4442 
4443  // paranoid check that block i is in this block preconditioner
4444  if (n >= para_n_blocks)
4445  {
4446  std::ostringstream err_msg;
4447  err_msg << "Requested block vector " << b
4448  << ", however this preconditioner has " << para_n_blocks
4449  << " block types.\n";
4450  throw OomphLibError(err_msg.str(),
4451  OOMPH_CURRENT_FUNCTION,
4452  OOMPH_EXCEPTION_LOCATION);
4453  }
4454  if (!v.built())
4455  {
4456  std::ostringstream err_msg;
4457  err_msg << "The distribution of the global vector v must be setup.";
4458  throw OomphLibError(err_msg.str(),
4459  OOMPH_CURRENT_FUNCTION,
4460  OOMPH_EXCEPTION_LOCATION);
4461  }
4462  if (*v.distribution_pt() != *this->master_distribution_pt())
4463  {
4464  std::ostringstream err_msg;
4465  err_msg << "The distribution of the global vector v must match the "
4466  << " specified master_distribution_pt(). \n"
4467  << "i.e. Distribution_pt in the master preconditioner";
4468  throw OomphLibError(err_msg.str(),
4469  OOMPH_CURRENT_FUNCTION,
4470  OOMPH_EXCEPTION_LOCATION);
4471  }
4472  if (!b.built())
4473  {
4474  std::ostringstream err_msg;
4475  err_msg << "The distribution of the block vector b must be setup.";
4476  throw OomphLibError(err_msg.str(),
4477  OOMPH_CURRENT_FUNCTION,
4478  OOMPH_EXCEPTION_LOCATION);
4479  }
4480 
4481 #endif
4482 
4483  // Get the most fine grain dof
4484  Vector<unsigned> most_fine_grain_dof = Block_to_dof_map_fine[n];
4485 
4486  // How many dofs are in this block?
4487  const unsigned n_dof_vec = Block_to_dof_map_fine[n].size();
4488 
4489  if(n_dof_vec == 1)
4490  // There is only one dof, no need to split.
4491  {
4492  internal_return_block_vector(most_fine_grain_dof[0],b,v);
4493  }
4494  else
4495  // Need to split the vector up before we insert them all in one go.
4496  {
4497  Vector<DoubleVector> dof_vector(n_dof_vec);
4498  for (unsigned d = 0; d < n_dof_vec; d++)
4499  {
4500  dof_vector[d].build(internal_block_distribution_pt(
4501  most_fine_grain_dof[d]));
4502  }
4503 
4505 
4506  // return to v
4507  internal_return_block_vectors(most_fine_grain_dof,
4508  dof_vector,v);
4509  }
4510  } // return_block_vector(...)
4511 
4512  //============================================================================
4513  /// \short Given the naturally ordered vector, v, return
4514  /// the vector rearranged in block order in w. This is a legacy function
4515  /// from the old block preconditioning framework. Kept alive in case it may
4516  /// be needed again.
4517  ///
4518  /// This uses the variables ending in "get_ordered". We no longer use this
4519  /// type of method. This function copy values from v and re-order them
4520  /// in "block order" and place them in w. Block order means that the
4521  /// values in w are the same as the concatenated block vectors.
4522  ///
4523  /// I.e. - v is naturally ordered.
4524  /// v -> s_b, v is ordered into blocks vectors
4525  /// (requires communication)
4526  /// concatenate_without_communication(s_{0,...,nblocks},w) gives w.
4527  ///
4528  /// But this function skips out the concatenation part and builds w directly
4529  /// from v.
4530  ///
4531  /// This is nice but the function is implemented in such a way that it
4532  /// always use all the (internal) blocks and concatenated with the
4533  /// identity ordering. I.e. if this preconditioner has 3 block types, then
4534  /// w will always be:
4535  /// concatenate_without_communication([s_0, s_1, s_2], w). There is easy
4536  /// way to change this.
4537  ///
4538  /// Furthermore, it does not take into account the new dof type coarsening
4539  /// feature. So this function will most likely produce the incorrect vector
4540  /// w from what the user intended. It still works, but w will be the
4541  /// concatenation of the most fine grain dof block vectors with the
4542  /// "natural" dof type ordering.
4543  ///
4544  /// This has been superseded by the function
4545  /// get_block_ordered_preconditioner_vector(...) which does the correct
4546  /// thing.
4547  //============================================================================
4548  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4550  DoubleVector& w)
4551  const
4552  {
4553 #ifdef PARANOID
4554  if (!v.built())
4555  {
4556  std::ostringstream error_message;
4557  error_message << "The distribution of the global vector v must be setup.";
4558  throw OomphLibError(error_message.str(),
4559  OOMPH_CURRENT_FUNCTION,
4560  OOMPH_EXCEPTION_LOCATION);
4561  }
4562  if (*v.distribution_pt() != *this->master_distribution_pt())
4563  {
4564  std::ostringstream error_message;
4565  error_message << "The distribution of the global vector v must match the "
4566  << " specified master_distribution_pt(). \n"
4567  << "i.e. Distribution_pt in the master preconditioner";
4568  throw OomphLibError(error_message.str(),
4569  OOMPH_CURRENT_FUNCTION,
4570  OOMPH_EXCEPTION_LOCATION);
4571  }
4572 #endif
4573 
4574  // Cleared and resized w for reordered vector
4575  w.build(this->internal_preconditioner_matrix_distribution_pt(),0.0);
4576 
4577  // if + only one processor
4578  // + more than one processor but matrix_pt is not distributed
4579  // then use the serial get_block method
4580  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
4581  !this->distribution_pt()->distributed())
4582  {
4583 
4584  // number of blocks
4585  unsigned nblock = this->Internal_nblock_types;
4586 
4587  // copy to w
4588  unsigned block_offset = 0;
4589  double* w_pt = w.values_pt();
4590  const double* v_pt = v.values_pt();
4591  for (unsigned b = 0; b < nblock;b++)
4592  {
4593  unsigned block_nrow = this->internal_block_dimension(b);
4594  for (unsigned i = 0; i < block_nrow; i++)
4595  {
4596  w_pt[block_offset+i] = v_pt[this->Global_index[b][i]];
4597  }
4598  block_offset += block_nrow;
4599  }
4600  }
4601  // otherwise use mpi
4602  else
4603  {
4604 #ifdef OOMPH_HAS_MPI
4605 
4606  // my rank
4607  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
4608 
4609  // the number of processors
4610  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
4611 
4612  // determine the maximum number of rows to be sent or recv
4613  unsigned max_n_send_or_recv = 0;
4614  for (unsigned p = 0; p < nproc; p++)
4615  {
4616  max_n_send_or_recv =
4617  std::max(max_n_send_or_recv,Nrows_to_send_for_get_ordered[p]);
4618  max_n_send_or_recv =
4619  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_ordered[p]);
4620  }
4621 
4622  // create a vectors of 1s (the size of the nblock for the mpi indexed
4623  // data types
4624  int* block_lengths = new int[max_n_send_or_recv];
4625  for (unsigned i = 0; i < max_n_send_or_recv; i++)
4626  {
4627  block_lengths[i] = 1;
4628  }
4629 
4630  // perform the sends and receives
4631  Vector<MPI_Request> requests;
4632  for (unsigned p = 0; p < nproc; p++)
4633  {
4634  // send and recv with other processors
4635  if (p != my_rank)
4636  {
4637  if (Nrows_to_send_for_get_ordered[p] > 0)
4638  {
4639  // create the send datatype
4640  MPI_Datatype type_send;
4641  MPI_Type_indexed(Nrows_to_send_for_get_ordered[p],block_lengths,
4642  Rows_to_send_for_get_ordered[p],MPI_DOUBLE,
4643  &type_send);
4644  MPI_Type_commit(&type_send);
4645 
4646  // send
4647  MPI_Request send_req;
4648  MPI_Isend(const_cast<double*>(v.values_pt()),1,type_send,p,0,
4649  this->distribution_pt()->communicator_pt()->mpi_comm(),
4650  &send_req);
4651  MPI_Type_free(&type_send);
4652  requests.push_back(send_req);
4653  }
4654 
4655  if (Nrows_to_recv_for_get_ordered[p] > 0)
4656  {
4657  // create the recv datatype
4658  MPI_Datatype type_recv;
4659  MPI_Type_indexed(Nrows_to_recv_for_get_ordered[p],block_lengths,
4660  Rows_to_recv_for_get_ordered[p],MPI_DOUBLE,
4661  &type_recv);
4662  MPI_Type_commit(&type_recv);
4663 
4664  // recv
4665  MPI_Request recv_req;
4666  MPI_Irecv(w.values_pt(),1,type_recv,p,0,
4667  this->distribution_pt()->communicator_pt()->mpi_comm(),
4668  &recv_req);
4669  MPI_Type_free(&type_recv);
4670  requests.push_back(recv_req);
4671  }
4672  }
4673 
4674  // communicate with self
4675  else
4676  {
4677  double* w_values_pt = w.values_pt();
4678  const double* v_values_pt = v.values_pt();
4679  for (unsigned i = 0; i < Nrows_to_send_for_get_ordered[p]; i++)
4680  {
4681  w_values_pt[Rows_to_recv_for_get_ordered[p][i]] =
4682  v_values_pt[Rows_to_send_for_get_ordered[p][i]];
4683  }
4684  }
4685  }
4686 
4687  // and then just wait
4688  unsigned c = requests.size();
4689  Vector<MPI_Status> stat(c);
4690  if (c)
4691  {
4692  MPI_Waitall(c,&requests[0],&stat[0]);
4693  }
4694  delete[] block_lengths;
4695 
4696 #else
4697  // throw error
4698  std::ostringstream error_message;
4699  error_message << "The preconditioner is distributed and on more than one "
4700  << "processor. MPI is required.";
4701  throw OomphLibError(error_message.str(),
4702  OOMPH_CURRENT_FUNCTION,
4703  OOMPH_EXCEPTION_LOCATION);
4704 #endif
4705  }
4706  }
4707 
4708  //============================================================================
4709  /// \short Given the naturally ordered vector, v, return
4710  /// the vector rearranged in block order in w. This function calls
4711  /// get_concatenated_block_vector(...) with the identity block mapping.
4712  ///
4713  /// This function has been re-written to work with the new dof type
4714  /// coarsening feature. The old function is kept alive in
4715  /// internal_get_block_ordered_preconditioner_vector(...) and is moved to
4716  /// the private section of the code. The differences between the two are:
4717  ///
4718  /// 1) This function extracts all the block vectors (in one go) via the
4719  /// function internal_get_block_vectors(...), and concatenates them.
4720  ///
4721  /// 2) The old function makes use of the variables ending in "get_ordered",
4722  /// thus is slightly more efficient since it does not have to concatenate
4723  /// any block vectors.
4724  ///
4725  /// 3) The old function no longer respect the new indirections if dof types
4726  /// have been coarsened.
4727  ///
4728  /// 4) This function extracts the most fine grain dof-level vectors and
4729  /// concatenates them. These dof-level vectors respect the re-ordering
4730  /// caused by the coarsening of dof types. The overhead associated with
4731  /// concatenating DoubleVectors without communication is very small.
4732  ///
4733  /// This function should be used.
4734  //============================================================================
4735  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4737  DoubleVector& w)
4738  {
4739 #ifdef PARANOID
4740  if (!v.built())
4741  {
4742  std::ostringstream error_message;
4743  error_message << "The distribution of the global vector v must be setup.";
4744  throw OomphLibError(error_message.str(),
4745  OOMPH_CURRENT_FUNCTION,
4746  OOMPH_EXCEPTION_LOCATION);
4747  }
4748  if (*v.distribution_pt() != *this->master_distribution_pt())
4749  {
4750  std::ostringstream error_message;
4751  error_message << "The distribution of the global vector v must match the "
4752  << " specified master_distribution_pt(). \n"
4753  << "i.e. Distribution_pt in the master preconditioner";
4754  throw OomphLibError(error_message.str(),
4755  OOMPH_CURRENT_FUNCTION,
4756  OOMPH_EXCEPTION_LOCATION);
4757  }
4758 #endif
4759 
4760  // Get the number of blocks.
4761  unsigned nblocks = this->nblock_types();
4762 
4763  // Fill in the identity mapping.
4764  Vector<unsigned> block_vec_number(nblocks,0);
4765  for (unsigned b = 0; b < nblocks; b++)
4766  {
4767  block_vec_number[b]=b;
4768  }
4769 
4770  // Do the work.
4771  get_concatenated_block_vector(block_vec_number,v,w);
4772  } // get_block_ordered_preconditioner_vector(...)
4773 
4774  //============================================================================
4775  /// \short Takes the block ordered vector, w, and reorders it in the natural
4776  /// order. Reordered vector is returned in v. Note: If the preconditioner is
4777  /// a subsidiary preconditioner then only the components of the vector
4778  /// associated with the blocks of the subsidiary preconditioner will be
4779  /// included. Hence the length of v is master_nrow() whereas that of the
4780  /// vector w is of length this->nrow().
4781  ///
4782  /// This is the return function for the function
4783  /// internal_get_block_ordered_preconditioner_vector(...).
4784  /// Both internal_get_block_ordered_preconditioner_vector(...) and
4785  /// internal_return_block_ordered_preconditioner_vector(...) has been
4786  /// superseded by the functions
4787  ///
4788  /// get_block_ordered_preconditioner_vector(...) and
4789  /// return_block_ordered_preconditioner_vector(...),
4790  ///
4791  /// Thus this function is moved to the private section of the code.
4792  //============================================================================
4793  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4795  DoubleVector& v) const
4796  {
4797 #ifdef PARANOID
4798  if (!v.built())
4799  {
4800  std::ostringstream error_message;
4801  error_message << "The distribution of the global vector v must be setup.";
4802  throw OomphLibError(error_message.str(),
4803  OOMPH_CURRENT_FUNCTION,
4804  OOMPH_EXCEPTION_LOCATION);
4805  }
4806  if (*v.distribution_pt() != *this->master_distribution_pt())
4807  {
4808  std::ostringstream error_message;
4809  error_message << "The distribution of the global vector v must match the "
4810  << " specified master_distribution_pt(). \n"
4811  << "i.e. Distribution_pt in the master preconditioner";
4812  throw OomphLibError(error_message.str(),
4813  OOMPH_CURRENT_FUNCTION,
4814  OOMPH_EXCEPTION_LOCATION);
4815  }
4816  if (!w.built())
4817  {
4818  std::ostringstream error_message;
4819  error_message << "The distribution of the block vector w must be setup.";
4820  throw OomphLibError(error_message.str(),
4821  OOMPH_CURRENT_FUNCTION,
4822  OOMPH_EXCEPTION_LOCATION);
4823  }
4824  if (*w.distribution_pt() != *this->internal_preconditioner_matrix_distribution_pt())
4825  {
4826  std::ostringstream error_message;
4827  error_message << "The distribution of the block vector w must match the "
4828  << " specified distribution at Distribution_pt[b]";
4829  throw OomphLibError(error_message.str(),
4830  OOMPH_CURRENT_FUNCTION,
4831  OOMPH_EXCEPTION_LOCATION);
4832  }
4833 #endif
4834 
4835 
4836  // if + only one processor
4837  // + more than one processor but matrix_pt is not distributed
4838  // then use the serial get_block method
4839  if (this->distribution_pt()->communicator_pt()->nproc() == 1 ||
4840  !this->distribution_pt()->distributed())
4841  {
4842  // number of blocks
4843  unsigned nblock = this->Internal_nblock_types;
4844 
4845  // copy to w
4846  unsigned block_offset = 0;
4847  const double* w_pt = w.values_pt();
4848  double* v_pt = v.values_pt();
4849  for (unsigned b = 0; b < nblock;b++)
4850  {
4851  unsigned block_nrow = this->internal_block_dimension(b);
4852  for (unsigned i = 0; i < block_nrow; i++)
4853  {
4854  v_pt[this->Global_index[b][i]] = w_pt[block_offset+i];
4855  }
4856  block_offset += block_nrow;
4857  }
4858  }
4859  // otherwise use mpi
4860  else
4861  {
4862 #ifdef OOMPH_HAS_MPI
4863 
4864  // my rank
4865  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
4866 
4867  // the number of processors
4868  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
4869 
4870  // determine the maximum number of rows to be sent or recv
4871  unsigned max_n_send_or_recv = 0;
4872  for (unsigned p = 0; p < nproc; p++)
4873  {
4874  max_n_send_or_recv =
4875  std::max(max_n_send_or_recv,Nrows_to_send_for_get_ordered[p]);
4876  max_n_send_or_recv =
4877  std::max(max_n_send_or_recv,Nrows_to_recv_for_get_ordered[p]);
4878  }
4879 
4880  // create a vectors of 1s (the size of the nblock for the mpi indexed
4881  // data types
4882  int* block_lengths = new int[max_n_send_or_recv];
4883  for (unsigned i = 0; i < max_n_send_or_recv; i++)
4884  {
4885  block_lengths[i] = 1;
4886  }
4887 
4888  // perform the sends and receives
4889  Vector<MPI_Request> requests;
4890  for (unsigned p = 0; p < nproc; p++)
4891  {
4892  // send and recv with other processors
4893  if (p != my_rank)
4894  {
4895  if (Nrows_to_recv_for_get_ordered[p] > 0)
4896  {
4897  // create the send datatype
4898  MPI_Datatype type_send;
4899  MPI_Type_indexed(Nrows_to_recv_for_get_ordered[p],block_lengths,
4900  Rows_to_recv_for_get_ordered[p],MPI_DOUBLE,
4901  &type_send);
4902  MPI_Type_commit(&type_send);
4903 
4904  // send
4905  MPI_Request send_req;
4906  MPI_Isend(const_cast<double*>(w.values_pt()),1,type_send,p,0,
4907  this->distribution_pt()->communicator_pt()->mpi_comm(),
4908  &send_req);
4909  MPI_Type_free(&type_send);
4910  requests.push_back(send_req);
4911  }
4912 
4913  if (Nrows_to_send_for_get_ordered[p] > 0)
4914  {
4915  // create the recv datatype
4916  MPI_Datatype type_recv;
4917  MPI_Type_indexed(Nrows_to_send_for_get_ordered[p],block_lengths,
4918  Rows_to_send_for_get_ordered[p],MPI_DOUBLE,
4919  &type_recv);
4920  MPI_Type_commit(&type_recv);
4921 
4922  // recv
4923  MPI_Request recv_req;
4924  MPI_Irecv(v.values_pt(),1,type_recv,p,0,
4925  this->distribution_pt()->communicator_pt()->mpi_comm(),&recv_req);
4926  MPI_Type_free(&type_recv);
4927  requests.push_back(recv_req);
4928  }
4929  }
4930 
4931  // communicate wih self
4932  else
4933  {
4934  const double* w_values_pt = w.values_pt();
4935  double* v_values_pt = v.values_pt();
4936  for (unsigned i = 0; i < Nrows_to_send_for_get_ordered[p]; i++)
4937  {
4938  v_values_pt[Rows_to_send_for_get_ordered[p][i]] =
4939  w_values_pt[Rows_to_recv_for_get_ordered[p][i]];
4940  }
4941  }
4942  }
4943 
4944  // and then just wait
4945  unsigned c = requests.size();
4946  Vector<MPI_Status> stat(c);
4947  if (c)
4948  {
4949  MPI_Waitall(c,&requests[0],&stat[0]);
4950  }
4951  delete[] block_lengths;
4952 
4953 #else
4954  // throw error
4955  std::ostringstream error_message;
4956  error_message << "The preconditioner is distributed and on more than one "
4957  << "processor. MPI is required.";
4958  throw OomphLibError(error_message.str(),
4959  OOMPH_CURRENT_FUNCTION,
4960  OOMPH_EXCEPTION_LOCATION);
4961 #endif
4962  } // else use mpi
4963  } // function return_block_ordered_preconditioner_vector
4964 
4965 
4966  //============================================================================
4967  /// \short Takes the block ordered vector, w, and reorders it in natural
4968  /// order. Reordered vector is returned in v. Note: If the preconditioner is
4969  /// a subsidiary preconditioner then only the components of the vector
4970  /// associated with the blocks of the subsidiary preconditioner will be
4971  /// included. Hence the length of v is master_nrow() whereas that of the
4972  /// vector w is of length this->nrow().
4973  ///
4974  /// This is the return function for the function
4975  /// get_block_ordered_preconditioner_vector(...).
4976  ///
4977  /// It calls the function return_concatenated_block_vector(...) with the
4978  /// identity block number ordering.
4979  //============================================================================
4980  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
4982  DoubleVector& v) const
4983  {
4984 #ifdef PARANOID
4985  if (!v.built())
4986  {
4987  std::ostringstream error_message;
4988  error_message << "The distribution of the global vector v must be setup.";
4989  throw OomphLibError(error_message.str(),
4990  OOMPH_CURRENT_FUNCTION,
4991  OOMPH_EXCEPTION_LOCATION);
4992  }
4993  if (*v.distribution_pt() != *this->master_distribution_pt())
4994  {
4995  std::ostringstream error_message;
4996  error_message << "The distribution of the global vector v must match the "
4997  << " specified master_distribution_pt(). \n"
4998  << "i.e. Distribution_pt in the master preconditioner";
4999  throw OomphLibError(error_message.str(),
5000  OOMPH_CURRENT_FUNCTION,
5001  OOMPH_EXCEPTION_LOCATION);
5002  }
5003  if (!w.built())
5004  {
5005  std::ostringstream error_message;
5006  error_message << "The distribution of the block vector w must be setup.";
5007  throw OomphLibError(error_message.str(),
5008  OOMPH_CURRENT_FUNCTION,
5009  OOMPH_EXCEPTION_LOCATION);
5010  }
5011  if (*w.distribution_pt() != *this->preconditioner_matrix_distribution_pt())
5012  {
5013  std::ostringstream error_message;
5014  error_message << "The distribution of the block vector w must match the "
5015  << "concatenations of distributions in "
5016  << "Block_distribution_pt.\n";
5017  throw OomphLibError(error_message.str(),
5018  OOMPH_CURRENT_FUNCTION,
5019  OOMPH_EXCEPTION_LOCATION);
5020  }
5021 #endif
5022 
5023  // Split into the block vectors.
5024  const unsigned nblocks = nblock_types();
5025  Vector<unsigned> block_vec_number(nblocks,0);
5026  for (unsigned b = 0; b < nblocks; b++)
5027  {
5028  block_vec_number[b] = b;
5029  }
5030 
5031  return_concatenated_block_vector(block_vec_number,w,v);
5032  } // function return_block_ordered_preconditioner_vector
5033 
5034 //=============================================================================
5035 /// \short Gets block (i,j) from the matrix pointed to by
5036 /// Matrix_pt and returns it in output_block. This is associated with the
5037 /// internal blocks. Please use the other get_block(...) function.
5038 //=============================================================================
5039  template<>
5041  internal_get_block(const unsigned& block_i, const unsigned& block_j,
5042  CRDoubleMatrix& output_block) const
5043  {
5044 
5045 #ifdef PARANOID
5046  // the number of blocks
5047  const unsigned n_blocks = this->internal_nblock_types();
5048 
5049  // paranoid check that block i is in this block preconditioner
5050  if (block_i >= n_blocks || block_j >= n_blocks)
5051  {
5052  std::ostringstream error_message;
5053  error_message << "Requested block (" << block_i << "," << block_j
5054  << "), however this preconditioner has internal_nblock_types() "
5055  << "= " << internal_nblock_types() << std::endl;
5056  throw OomphLibError(error_message.str(),
5057  OOMPH_CURRENT_FUNCTION,
5058  OOMPH_EXCEPTION_LOCATION);
5059  }
5060 
5061  // Check that the matrix is the same as that of the master
5062  if(is_subsidiary_block_preconditioner())
5063  {
5064  if(master_block_preconditioner_pt()->matrix_pt() != matrix_pt())
5065  {
5066  std::string err = "Master and subs should have same matrix.";
5067  throw OomphLibError(err, OOMPH_CURRENT_FUNCTION,
5068  OOMPH_EXCEPTION_LOCATION);
5069  }
5070  }
5071 #endif
5072 
5073  // Cast the pointer
5074  CRDoubleMatrix* cr_matrix_pt = dynamic_cast<CRDoubleMatrix*>(matrix_pt());
5075 
5076  // if + only one processor
5077  // + more than one processor but matrix_pt is not distributed
5078  // then use the serial get_block method
5079  if (cr_matrix_pt->distribution_pt()->communicator_pt()->nproc() == 1 ||
5080  !cr_matrix_pt->distribution_pt()->distributed())
5081  {
5082  // pointers for the jacobian matrix is compressed row sparse format
5083  int* j_row_start;
5084  int* j_column_index;
5085  double* j_value;
5086 
5087  // sets pointers to jacobian matrix
5088  j_row_start = cr_matrix_pt->row_start();
5089  j_column_index = cr_matrix_pt->column_index();
5090  j_value = cr_matrix_pt->value();
5091 
5092  // get the block dimensions
5093  unsigned block_nrow = this->internal_block_dimension(block_i);
5094  unsigned block_ncol = this->internal_block_dimension(block_j);
5095 
5096  // allocate temporary storage for the component vectors of block (i,j)
5097  // temp_ptr is used to point to an element in each column - required as
5098  // cannot assume that order of block's rows in jacobian and the block
5099  // matrix will be the same
5100  int* temp_row_start = new int[block_nrow+1];
5101  for (unsigned i = 0; i <= block_nrow; i++)
5102  {
5103  temp_row_start[i] = 0;
5104  }
5105  Vector<int> temp_ptr(block_nrow+1);
5106  int block_nnz = 0;
5107 
5108  // get number of rows in source matrix
5109  unsigned master_nrow = this->master_nrow();
5110 
5111  // determine how many non zeros there are in the block (i,j)
5112  // also determines how many non zeros are stored in each row or column -
5113  // stored in temp_ptr temporarily
5114  for (unsigned k = 0; k < master_nrow; k++)
5115  {
5116  if (internal_block_number(k) == static_cast<int>(block_i))
5117  {
5118  for (int l = j_row_start[k];
5119  l < j_row_start[k+1]; l++)
5120  {
5121  if (internal_block_number(j_column_index[l]) ==
5122  static_cast<int>(block_j))
5123  {
5124  block_nnz++;
5125  temp_ptr[internal_index_in_block(k)+1]++;
5126  }
5127  }
5128  }
5129  }
5130 
5131  // if the matrix is not empty
5132  int* temp_column_index = new int[block_nnz];
5133  double* temp_value = new double[block_nnz];
5134  if (block_nnz > 0)
5135  {
5136 
5137  // uses number of elements in each column of block to determine values
5138  // for the block column start (temp_row_start)
5139  temp_row_start[0] = 0;
5140  for (unsigned k = 1; k <= block_nrow; k++)
5141  {
5142  temp_row_start[k] = temp_row_start[k-1]+temp_ptr[k];
5143  temp_ptr[k] = temp_row_start[k];
5144  }
5145 
5146  // copies the relevant elements of the jacobian to the correct entries
5147  // of the block matrix
5148  for (unsigned k = 0; k < master_nrow; k++)
5149  {
5150  if (internal_block_number(k) == static_cast<int>(block_i))
5151  {
5152  for (int l = j_row_start[k];
5153  l < j_row_start[k+1]; l++)
5154  {
5155  if (internal_block_number(j_column_index[l]) ==
5156  static_cast<int>(block_j))
5157  {
5158  int kk = temp_ptr[internal_index_in_block(k)]++;
5159  temp_value[kk] = j_value[l];
5160  temp_column_index[kk] =
5161  internal_index_in_block(j_column_index[l]);
5162  }
5163  }
5164  }
5165  }
5166  }
5167 
5168 
5169  // Fill in the compressed row matrix ??ds Note: I kept the calls to
5170  // build as close as I could to before (had to replace new(dist) with
5171  // .build(dist) ).
5172  output_block.build(Internal_block_distribution_pt[block_i]);
5173  output_block.build_without_copy(block_ncol,block_nnz,
5174  temp_value,temp_column_index,
5175  temp_row_start);
5176 
5177 #ifdef PARANOID
5178  // checks to see if block matrix has been set up correctly
5179  // block_matrix_test(matrix_pt,block_i,block_j,block_pt);
5180  if (Run_block_matrix_test)
5181  {
5182  // checks to see if block matrix has been set up correctly
5183  block_matrix_test(block_i, block_j, &output_block);
5184  }
5185 #endif
5186  }
5187 
5188 
5189  // otherwise we are dealing with a distributed matrix
5190  else
5191  {
5192 #ifdef OOMPH_HAS_MPI
5193  // number of processors
5194  unsigned nproc = this->distribution_pt()->communicator_pt()->nproc();
5195 
5196  // my rank
5197  unsigned my_rank = this->distribution_pt()->communicator_pt()->my_rank();
5198 
5199  // sets pointers to jacobian matrix
5200  int* j_row_start = cr_matrix_pt->row_start();
5201  int* j_column_index = cr_matrix_pt->column_index();
5202  double* j_value = cr_matrix_pt->value();
5203 
5204  // number of non zeros in each row to be sent
5205  Vector<int*> nnz_send(nproc,0);
5206 
5207  // number of non zeros in each row to be received
5208  Vector<int*> nnz_recv(nproc,0);
5209 
5210  // storage for data to be sent
5211  Vector<int*> column_index_for_proc(nproc,0);
5212  Vector<double*> values_for_proc(nproc,0);
5213 
5214  // number of non zeros to be sent to each processor
5215  Vector<unsigned> total_nnz_send(nproc,0);
5216 
5217  // number of rows of the block matrix on this processor
5218  unsigned nrow_local = Internal_block_distribution_pt[block_i]->nrow_local();
5219 
5220  // resize the nnz storage and compute nnz_send
5221  // and send and recv the nnz
5222  Vector<MPI_Request> send_req;
5223  Vector<MPI_Request> recv1_req;
5224  for (unsigned p = 0; p < nproc; p++)
5225  {
5226  int nrow_send = Nrows_to_send_for_get_block(block_i,p);
5227  int nrow_recv = Nrows_to_recv_for_get_block(block_i,p);
5228 
5229  // assemble nnz recv
5230  nnz_recv[p] = new int[nrow_recv];
5231 
5232  // assemble the storage to send
5233  if (nrow_send > 0 && p != my_rank)
5234  {
5235  nnz_send[p] = new int[nrow_send];
5236  }
5237 
5238  // compute the number of nnzs in each row and the total number
5239  // of nnzs
5240  for (int i = 0; i < nrow_send; i++)
5241  {
5242  unsigned row = Rows_to_send_for_get_block(block_i,p)[i];
5243  int c = 0;
5244  for (int r = j_row_start[row]; r < j_row_start[row+1]; r++)
5245  {
5246  if (internal_block_number(j_column_index[r]) == int(block_j))
5247  {
5248  c++;
5249  }
5250  }
5251  if (p != my_rank)
5252  {
5253  nnz_send[p][i] = c;
5254  }
5255  else
5256  {
5257  nnz_recv[p][i] = c;
5258  }
5259  total_nnz_send[p] += c;
5260  }
5261 
5262  // send
5263  if (p != my_rank)
5264  {
5265  if (nrow_send)
5266  {
5267  MPI_Request req;
5268  MPI_Isend(nnz_send[p],nrow_send,MPI_INT,p,0,
5269  this->distribution_pt()->communicator_pt()->mpi_comm(),
5270  &req);
5271  send_req.push_back(req);
5272  }
5273 
5274  // recv
5275  if (nrow_recv)
5276  {
5277  MPI_Request req;
5278  MPI_Irecv(nnz_recv[p],nrow_recv,MPI_INT,p,0,
5279  this->distribution_pt()->communicator_pt()->mpi_comm(),
5280  &req);
5281  recv1_req.push_back(req);
5282  }
5283  }
5284  }
5285 
5286  // next assemble the values and row_start data to be sent for each
5287  // processor
5288  for (unsigned p = 0; p < nproc; p++)
5289  {
5290  int nrow_send = Nrows_to_send_for_get_block(block_i,p);
5291 
5292  // assemble the storage for the values and column indices to be sent
5293  if (p != my_rank)
5294  {
5295  if (total_nnz_send[p] > 0)
5296  {
5297  values_for_proc[p] = new double[total_nnz_send[p]];
5298  column_index_for_proc[p] = new int[total_nnz_send[p]];
5299 
5300  // copy the values and column indices to the storage
5301  unsigned ptr = 0;
5302  for (int i = 0; i < nrow_send; i++)
5303  {
5304  unsigned row = Rows_to_send_for_get_block(block_i,p)[i];
5305  for (int r = j_row_start[row]; r < j_row_start[row+1]; r++)
5306  {
5307  if (internal_block_number(j_column_index[r]) == int(block_j))
5308  {
5309  values_for_proc[p][ptr] = j_value[r];
5310  column_index_for_proc[p][ptr] =
5311  internal_index_in_block(j_column_index[r]);
5312  ptr++;
5313  }
5314  }
5315  }
5316 
5317  // create the datatypes
5318  MPI_Datatype types[2];
5319  MPI_Type_contiguous(total_nnz_send[p],MPI_DOUBLE,&types[0]);
5320  MPI_Type_commit(&types[0]);
5321  MPI_Type_contiguous(total_nnz_send[p],MPI_INT,&types[1]);
5322  MPI_Type_commit(&types[1]);
5323 
5324  // get the start address of the vectors
5325  MPI_Aint displacement[2];
5326  MPI_Address(values_for_proc[p],&displacement[0]);
5327  MPI_Address(column_index_for_proc[p],&displacement[1]);
5328 
5329  // compute the displacements
5330  displacement[1] -= displacement[0];
5331  displacement[0] -= displacement[0];
5332 
5333  // compute the block lengths
5334  int length[2];
5335  length[0] = length[1] = 1;
5336 
5337  // build the struct data type
5338  MPI_Datatype final_type;
5339  MPI_Type_struct(2,length,displacement,types,&final_type);
5340  MPI_Type_commit(&final_type);
5341  MPI_Type_free(&types[0]);
5342  MPI_Type_free(&types[1]);
5343 
5344  // and send
5345  MPI_Request req;
5346  MPI_Isend(values_for_proc[p],1,final_type,p,1,
5347  this->distribution_pt()->communicator_pt()->mpi_comm(),
5348  &req);
5349  send_req.push_back(req);
5350  MPI_Type_free(&final_type);
5351  }
5352  }
5353  }
5354 
5355  // wait for the recv to complete (the row_start recv which actually
5356  // contains the number of nnzs in each row)
5357  int c_recv = recv1_req.size();
5358  if (c_recv != 0)
5359  {
5360  MPI_Waitall(c_recv,&recv1_req[0],MPI_STATUS_IGNORE);
5361  }
5362 
5363  // compute the total number of nnzs to be received
5364  Vector<int> total_nnz_recv_from_proc(nproc);
5365  int local_block_nnz = 0;
5366  for (unsigned p = 0; p < nproc; p++)
5367  {
5368  // compute the total nnzs
5369  for (unsigned i = 0; i < Nrows_to_recv_for_get_block(block_i,p); i++)
5370  {
5371  total_nnz_recv_from_proc[p] += nnz_recv[p][i];
5372 
5373  }
5374  local_block_nnz += total_nnz_recv_from_proc[p];
5375  }
5376 
5377  // compute the offset for each block of nnzs (a matrix row) in the
5378  // values_recv and column_index_recv vectors
5379 
5380  // fisrt determine how many blocks of rows are to be recv
5381  Vector<int> n_recv_block(nproc,0);
5382  for (unsigned p = 0; p < nproc; p++)
5383  {
5384  if (Nrows_to_recv_for_get_block(block_i,p) > 0)
5385  {
5386  n_recv_block[p] = 1;
5387  }
5388  for (unsigned i = 1; i < Nrows_to_recv_for_get_block(block_i,p); i++)
5389  {
5390  if (Rows_to_recv_for_get_block(block_i,p)[i] !=
5391  Rows_to_recv_for_get_block(block_i,p)[i-1] + 1)
5392  {
5393  n_recv_block[p]++;
5394  }
5395  }
5396  }
5397 
5398  // next assemble row start recv
5399  int* row_start_recv = new int[nrow_local+1];
5400  for (unsigned i = 0; i <= nrow_local; i++)
5401  {
5402  row_start_recv[i] = 0;
5403  }
5404  for (unsigned p = 0; p < nproc; p++)
5405  {
5406  for (unsigned i = 0; i < Nrows_to_recv_for_get_block(block_i,p); i++)
5407  {
5408  row_start_recv[Rows_to_recv_for_get_block(block_i,p)[i]]
5409  = nnz_recv[p][i];
5410  }
5411  }
5412  int g = row_start_recv[0];
5413  row_start_recv[0] = 0;
5414  for (unsigned i = 1; i < nrow_local; i++)
5415  {
5416  int temp_g = g;
5417  g = row_start_recv[i];
5418  row_start_recv[i] = row_start_recv[i-1] + temp_g;
5419  }
5420  row_start_recv[nrow_local] = row_start_recv[nrow_local-1] + g;
5421 
5422  // next assemble the offset and the number of nzs in each recv block
5423  Vector<int*> offset_recv_block(nproc,0);
5424  Vector<int*> nnz_recv_block(nproc,0);
5425  for (unsigned p = 0; p < nproc; p++)
5426  {
5427  if (Nrows_to_recv_for_get_block(block_i,p) > 0)
5428  {
5429  offset_recv_block[p] = new int[n_recv_block[p]];
5430  offset_recv_block[p][0] = 0;
5431  nnz_recv_block[p] = new int[n_recv_block[p]];
5432  for (int i = 0; i < n_recv_block[p]; i++)
5433  {
5434  nnz_recv_block[p][i] = 0;
5435  }
5436  unsigned ptr = 0;
5437  nnz_recv_block[p][ptr] += nnz_recv[p][0];
5438  offset_recv_block[p][0]
5439  = row_start_recv[Rows_to_recv_for_get_block(block_i,p)[0]];
5440  for (unsigned i = 1; i < Nrows_to_recv_for_get_block(block_i,p); i++)
5441  {
5442  if (Rows_to_recv_for_get_block(block_i,p)[i] !=
5443  Rows_to_recv_for_get_block(block_i,p)[i-1] + 1)
5444  {
5445  ptr++;
5446  offset_recv_block[p][ptr]
5447  = row_start_recv[Rows_to_recv_for_get_block(block_i,p)[i]];
5448  }
5449  nnz_recv_block[p][ptr] += nnz_recv[p][i];
5450  }
5451  }
5452  delete[] nnz_recv[p];
5453  }
5454 
5455  // post the receives
5456  int* column_index_recv = new int[local_block_nnz];
5457  double* values_recv = new double[local_block_nnz];
5458  Vector<MPI_Request> recv2_req;
5459  for (unsigned p = 0; p < nproc; p++)
5460  {
5461  if (p != my_rank)
5462  {
5463  if (total_nnz_recv_from_proc[p] != 0)
5464  {
5465  // create the datatypes
5466  MPI_Datatype types[2];
5467  MPI_Type_indexed(n_recv_block[p],nnz_recv_block[p],
5468  offset_recv_block[p],MPI_DOUBLE,&types[0]);
5469  MPI_Type_commit(&types[0]);
5470  MPI_Type_indexed(n_recv_block[p],nnz_recv_block[p],
5471  offset_recv_block[p],MPI_INT,&types[1]);
5472  MPI_Type_commit(&types[1]);
5473 
5474  // compute the displacements
5475  MPI_Aint displacements[2];
5476  MPI_Address(values_recv,&displacements[0]);
5477  MPI_Address(column_index_recv,&displacements[1]);
5478  displacements[1] -= displacements[0];
5479  displacements[0] -= displacements[0];
5480 
5481  // compute the block lengths
5482  int length[2];
5483  length[0] = length[1] = 1;
5484 
5485  // create the final datatype
5486  MPI_Datatype final_type;
5487  MPI_Type_struct(2,length,displacements,types,&final_type);
5488  MPI_Type_commit(&final_type);
5489  MPI_Type_free(&types[0]);
5490  MPI_Type_free(&types[1]);
5491 
5492  // and the recv
5493  MPI_Request req;
5494  MPI_Irecv(values_recv,1,final_type,p,1,
5495  this->distribution_pt()->communicator_pt()->mpi_comm(),
5496  &req);
5497  recv2_req.push_back(req);
5498  MPI_Type_free(&final_type);
5499  }
5500  }
5501  else
5502  {
5503  // next send the values and column indices to self
5504  unsigned block_ptr = 0;
5505  unsigned counter = 0;
5506  int nrow_send = Nrows_to_send_for_get_block(block_i,my_rank);
5507  if (nrow_send > 0)
5508  {
5509  unsigned offset = offset_recv_block[my_rank][0];
5510  for (int i = 0; i < nrow_send; i++)
5511  {
5512  if (i > 0)
5513  {
5514  if (Rows_to_recv_for_get_block(block_i,p)[i] !=
5515  Rows_to_recv_for_get_block(block_i,p)[i-1] + 1)
5516  {
5517  counter = 0;
5518  block_ptr++;
5519  offset = offset_recv_block[my_rank][block_ptr];
5520  }
5521  }
5522  unsigned row = Rows_to_send_for_get_block(block_i,my_rank)[i];
5523  for (int r = j_row_start[row]; r < j_row_start[row+1]; r++)
5524  {
5525  if (internal_block_number(j_column_index[r]) == int(block_j))
5526  {
5527  values_recv[offset+counter] = j_value[r];
5528  column_index_recv[offset + counter] =
5529  internal_index_in_block(j_column_index[r]);
5530  counter++;
5531  }
5532  }
5533  }
5534  }
5535  }
5536  }
5537 
5538  // wait for the recv to complete (for the column_index and the values_
5539  c_recv = recv2_req.size();
5540  if (c_recv != 0)
5541  {
5542  MPI_Waitall(c_recv,&recv2_req[0],MPI_STATUS_IGNORE);
5543  }
5544 
5545  // Fill in the compressed row matrix
5546  output_block.build(Internal_block_distribution_pt[block_i]);
5547  output_block.build_without_copy(this->internal_block_dimension(block_j),
5548  local_block_nnz,
5549  values_recv,
5550  column_index_recv,
5551  row_start_recv);
5552 
5553  // wait for the send to complete (nnz / row_start)
5554  int c_send = send_req.size();
5555  if (c_send)
5556  {
5557  MPI_Waitall(c_send,&send_req[0],MPI_STATUS_IGNORE);
5558  }
5559 
5560  // delete temp storage used for assembling data for communication
5561  for (unsigned p = 0; p < nproc; p++)
5562  {
5563  delete[] nnz_send[p];
5564  delete[] column_index_for_proc[p];
5565  delete[] values_for_proc[p];
5566  delete[] offset_recv_block[p];
5567  delete[] nnz_recv_block[p];
5568  }
5569 #else
5570  // throw error
5571  std::ostringstream error_message;
5572  error_message << "The matrix is distributed and on more than one "
5573  << "processor. MPI is required.";
5574  throw OomphLibError(error_message.str(),
5575  OOMPH_CURRENT_FUNCTION,
5576  OOMPH_EXCEPTION_LOCATION);
5577 #endif
5578  }
5579  }
5580 
5581  //=============================================================================
5582  /// \short Gets dof-level block (i,j).
5583  /// If Replacement_dof_block_pt(i,j) is not null, then the replacement
5584  /// block is returned via a deep copy.
5585  ///
5586  /// Otherwise if this is the uppermost block preconditioner then it calls
5587  /// internal_get_block(i,j), else if it is a subsidiary
5588  /// block preconditioner, it will call it's master block preconditioners'
5589  /// get_dof_level_block function.
5590  //=============================================================================
5591  template<>
5593  get_dof_level_block(const unsigned& block_i, const unsigned& block_j,
5594  CRDoubleMatrix& output_block,
5595  const bool& ignore_replacement_block) const
5596  {
5597 #ifdef PARANOID
5598  // the number of dof types.
5599  unsigned para_ndofs = ndof_types();
5600 
5601  // paranoid check that block i is in this block preconditioner
5602  if (block_i >= para_ndofs || block_j >= para_ndofs)
5603  {
5604  std::ostringstream err_msg;
5605  err_msg << "Requested dof block (" << block_i << "," << block_j
5606  << "), however this preconditioner has ndof_types() "
5607  << "= " << para_ndofs << std::endl;
5608  throw OomphLibError(err_msg.str(),
5609  OOMPH_CURRENT_FUNCTION,
5610  OOMPH_EXCEPTION_LOCATION);
5611  }
5612 #endif
5613 
5614  CRDoubleMatrix * tmp_block_pt = Replacement_dof_block_pt.get(block_i,block_j);
5615 
5616  if((tmp_block_pt == 0) || ignore_replacement_block)
5617  {
5618 
5619  // Getting the block from parent preconditioner
5620  const unsigned ndof_in_parent_i = Doftype_coarsen_map_coarse[block_i].size();
5621  const unsigned ndof_in_parent_j = Doftype_coarsen_map_coarse[block_j].size();
5622 
5623  if(ndof_in_parent_i == 1 && ndof_in_parent_j == 1)
5624  {
5625  unsigned parent_dof_i = Doftype_coarsen_map_coarse[block_i][0];
5626  unsigned parent_dof_j = Doftype_coarsen_map_coarse[block_j][0];
5627 
5628  if(is_master_block_preconditioner())
5629  {
5630  internal_get_block(parent_dof_i,parent_dof_j,output_block);
5631  }
5632  else
5633  {
5634  parent_dof_i = Doftype_in_master_preconditioner_coarse[parent_dof_i];
5635  parent_dof_j = Doftype_in_master_preconditioner_coarse[parent_dof_j];
5636 
5637  master_block_preconditioner_pt()->get_dof_level_block(parent_dof_i,
5638  parent_dof_j,
5639  output_block,
5640  ignore_replacement_block);
5641  }
5642  }
5643  else
5644  {
5645 
5646  DenseMatrix<CRDoubleMatrix*> tmp_blocks_pt(ndof_in_parent_i,ndof_in_parent_j,0);
5647 
5648  Vector<Vector<unsigned> > new_block(ndof_in_parent_i,Vector<unsigned>(ndof_in_parent_j,0));
5649 
5650  for (unsigned dof_i = 0; dof_i < ndof_in_parent_i; dof_i++)
5651  {
5652  unsigned parent_dof_i = Doftype_coarsen_map_coarse[block_i][dof_i];
5653  if(is_subsidiary_block_preconditioner())
5654  {
5655  parent_dof_i = Doftype_in_master_preconditioner_coarse[parent_dof_i];
5656  }
5657 
5658  for (unsigned dof_j = 0; dof_j < ndof_in_parent_j; dof_j++)
5659  {
5660  unsigned parent_dof_j = Doftype_coarsen_map_coarse[block_j][dof_j];
5661 
5662  tmp_blocks_pt(dof_i,dof_j) = new CRDoubleMatrix;
5663 
5664  new_block[dof_i][dof_j] = 1;
5665 
5666  if(is_master_block_preconditioner())
5667  {
5668  internal_get_block(parent_dof_i,parent_dof_j,*tmp_blocks_pt(dof_i,dof_j));
5669  }
5670  else
5671  {
5672  parent_dof_j = Doftype_in_master_preconditioner_coarse[parent_dof_j];
5673 
5674  master_block_preconditioner_pt()
5675  ->get_dof_level_block(parent_dof_i,
5676  parent_dof_j,
5677  *tmp_blocks_pt(dof_i,dof_j),
5678  ignore_replacement_block);
5679  }
5680  }
5681  }
5682 
5683  Vector<LinearAlgebraDistribution*> tmp_row_dist_pt(ndof_in_parent_i,0);
5684 
5685  for (unsigned parent_dof_i = 0; parent_dof_i < ndof_in_parent_i; parent_dof_i++)
5686  {
5687  unsigned mapped_dof_i = Doftype_coarsen_map_coarse[block_i][parent_dof_i];
5688 
5689  if(is_master_block_preconditioner())
5690  {
5691  tmp_row_dist_pt[parent_dof_i] = Internal_block_distribution_pt[mapped_dof_i];
5692  }
5693  else
5694  {
5695  mapped_dof_i = Doftype_in_master_preconditioner_coarse[mapped_dof_i];
5696 
5697  tmp_row_dist_pt[parent_dof_i]
5698  = master_block_preconditioner_pt()
5699  ->dof_block_distribution_pt(mapped_dof_i);
5700 
5701  }
5702  }
5703 
5704  Vector<LinearAlgebraDistribution*> tmp_col_dist_pt(ndof_in_parent_j,0);
5705 
5706  for (unsigned parent_dof_j = 0; parent_dof_j < ndof_in_parent_j; parent_dof_j++)
5707  {
5708  unsigned mapped_dof_j = Doftype_coarsen_map_coarse[block_j][parent_dof_j];
5709 
5710  if(is_master_block_preconditioner())
5711  {
5712  tmp_col_dist_pt[parent_dof_j] = Internal_block_distribution_pt[mapped_dof_j];
5713  }
5714  else
5715  {
5716  mapped_dof_j = Doftype_in_master_preconditioner_coarse[mapped_dof_j];
5717  tmp_col_dist_pt[parent_dof_j]
5718  = master_block_preconditioner_pt()
5719  ->dof_block_distribution_pt(mapped_dof_j);
5720 
5721  }
5722  }
5723 
5725  tmp_col_dist_pt,
5726  tmp_blocks_pt,
5727  output_block);
5728 
5729  for (unsigned dof_i = 0; dof_i < ndof_in_parent_i; dof_i++)
5730  {
5731  for (unsigned dof_j = 0; dof_j < ndof_in_parent_j; dof_j++)
5732  {
5733  if(new_block[dof_i][dof_j])
5734  {
5735  delete tmp_blocks_pt(dof_i,dof_j);
5736  }
5737  }
5738  }
5739  }
5740 
5741  }
5742  else
5743  {
5744  CRDoubleMatrixHelpers::deep_copy(tmp_block_pt,output_block);
5745  }
5746  }
5747 
5748 //=============================================================================
5749 /// \short test function to check that every element in the block matrix
5750 /// (block_i,block_j) matches the corresponding element in the original matrix
5751 //=============================================================================
5752  template<typename MATRIX> void BlockPreconditioner<MATRIX>::
5753  block_matrix_test(const unsigned& block_i, const unsigned& block_j,
5754  const MATRIX* block_matrix_pt) const
5755  {
5756 
5757  // boolean flag to indicate whether test is passed
5758  bool check = true;
5759 
5760  // number of rows in matrix
5761  unsigned n_row = matrix_pt()->nrow();
5762 
5763  // number of columns in matrix
5764  unsigned n_col = matrix_pt()->ncol();
5765 
5766  // loop over rows of original matrix
5767  for (unsigned i = 0; i < n_row; i++)
5768  {
5769 
5770  // if this coefficient is associated with a block in this block
5771  // preconditioner
5772  if (static_cast<int>(block_i) == this->internal_block_number(i))
5773  {
5774 
5775  // loop over columns of original matrix
5776  for (unsigned j = 0; j < n_col; j++)
5777  {
5778 
5779  // if the coeeficient is associated with a block in this block
5780  // preconditioner
5781  if (static_cast<int>(block_j) == this->internal_block_number(j))
5782  {
5783 
5784  // check whether elements in original matrix and matrix of block
5785  // pointers match
5786  if ( matrix_pt()->operator()(i,j) !=
5787  block_matrix_pt
5788  ->operator()(internal_index_in_block(i),internal_index_in_block(j)) )
5789  {
5790  check = false;
5791  }
5792  }
5793  }
5794  }
5795  }
5796 
5797  // throw error
5798  if (!check)
5799  {
5800  std::ostringstream error_message;
5801  error_message << "The require elements have not been successfully copied"
5802  << " from the original matrix to the block matrices";
5803  throw OomphLibError(error_message.str(),
5804  OOMPH_CURRENT_FUNCTION,
5805  OOMPH_EXCEPTION_LOCATION);
5806  }
5807  }
5808 
5809 
5810  template class BlockPreconditioner<CRDoubleMatrix>;
5811 
5812 } // Namespace: oomph
5813 
int * column_index()
Access to C-style column index array.
Definition: matrices.h:1056
bool built() const
unsigned first_row() const
access function for the first row on this processor. If not distributed then this is just zero...
void internal_get_block_vector(const unsigned &n, const DoubleVector &v, DoubleVector &b) const
A helper function, takes the naturally ordered vector, v, and extracts the n-th block vector...
double * values_pt()
access function to the underlying values
cstr elem_len * i
Definition: cfortran.h:607
unsigned nrow() const
access function to the number of global rows.
unsigned nrow_local() const
access function for the num of local rows on this processor. If no MPI then Nrow is returned...
void return_concatenated_block_vector(const Vector< unsigned > &block_vec_number, const DoubleVector &b, DoubleVector &v) const
Takes concatenated block ordered vector, b, and copies its entries to the appropriate entries in the ...
void return_block_vector(const unsigned &n, const DoubleVector &b, DoubleVector &v) const
Takes the n-th block ordered vector, b, and copies its entries to the appropriate entries in the natu...
e
Definition: cfortran.h:575
bool distributed() const
access function to the distributed - indicates whether the distribution is serial or distributed ...
double * value()
Access to C-style value array.
Definition: matrices.h:1062
void get_block_vectors(const Vector< unsigned > &block_vec_number, const DoubleVector &v, Vector< DoubleVector > &s) const
Takes the naturally ordered vector and rearranges it into a vector of sub vectors corresponding to th...
void concatenate(const Vector< LinearAlgebraDistribution * > &in_distribution_pt, LinearAlgebraDistribution &out_distribution)
Takes a vector of LinearAlgebraDistribution objects and concatenates them such that the nrow_local of...
void get_block_ordered_preconditioner_vector(const DoubleVector &v, DoubleVector &w)
Given the naturally ordered vector, v, return the vector rearranged in block order in w...
void concatenate_without_communication(const Vector< DoubleVector * > &in_vector_pt, DoubleVector &out_vector)
Concatenate DoubleVectors. Takes a Vector of DoubleVectors. If the out vector is built, we will not build a new distribution. Otherwise a new distribution will be built using LinearAlgebraDistribution::concatenate(...).
void split_without_communication(const DoubleVector &in_vector, Vector< DoubleVector * > &out_vector_pt)
Split a DoubleVector into the out DoubleVectors. Data stays on its current processor, no data is sent between processors. This results in our vectors which are a permutation of the in vector.
unsigned long nrow() const
Return the number of rows of the matrix.
Definition: matrices.h:504
LinearAlgebraDistribution * distribution_pt() const
access to the LinearAlgebraDistribution
void return_block_ordered_preconditioner_vector(const DoubleVector &w, DoubleVector &v) const
Takes the block ordered vector, w, and reorders it in natural order. Reordered vector is returned in ...
unsigned long nnz() const
Return the number of nonzero entries (the local nnz)
Definition: matrices.h:1068
void internal_return_block_vectors(const Vector< unsigned > &block_vec_number, const Vector< DoubleVector > &s, DoubleVector &v) const
A helper function, takes the vector of block vectors, s, and copies its entries into the naturally or...
Describes the distribution of a distributable linear algebra type object. Typically this is a contain...
void build(const DoubleVector &old_vector)
Just copys the argument DoubleVector.
void deep_copy(const CRDoubleMatrix *const in_matrix_pt, CRDoubleMatrix &out_matrix)
Create a deep copy of the matrix pointed to by in_matrix_pt.
Definition: matrices.h:3244
void get_concatenated_block_vector(const Vector< unsigned > &block_vec_number, const DoubleVector &v, DoubleVector &b)
Takes the naturally ordered vector and extracts the blocks indicated by the block number (the values)...
virtual void block_setup()
Determine the size of the matrix blocks and setup the lookup schemes relating the global degrees of f...
static char t char * s
Definition: cfortran.h:572
unsigned long ncol() const
Return the number of columns of the matrix.
Definition: matrices.h:507
void initialise(const _Tp &__value)
Iterate over all values and set to the desired value.
Definition: Vector.h:171
void turn_into_subsidiary_block_preconditioner(BlockPreconditioner< MATRIX > *master_block_prec_pt, const Vector< unsigned > &doftype_in_master_preconditioner_coarse)
Function to turn this preconditioner into a subsidiary preconditioner that operates within a bigger "...
std::string string(const unsigned &i)
Return the i-th string or "" if the relevant string hasn't been defined.
void get_blocks(DenseMatrix< bool > &required_blocks, DenseMatrix< MATRIX * > &block_matrix_pt) const
Get all the block matrices required by the block preconditioner. Takes a pointer to a matrix of bools...
static long Is_unclassified
Static "Magic number" used in place of the equation number to denote a value that hasn't been class...
Definition: nodes.h:201
void return_block_vectors(const Vector< unsigned > &block_vec_number, const Vector< DoubleVector > &s, DoubleVector &v) const
Takes the vector of block vectors, s, and copies its entries into the naturally ordered vector...
OomphCommunicator * communicator_pt() const
const access to the communicator pointer
A vector in the mathematical sense, initially developed for linear algebra type applications. If MPI then this vector can be distributed - its distribution is described by the LinearAlgebraDistribution object at Distribution_pt. Data is stored in a C-style pointer vector (double*)
Definition: double_vector.h:61
void internal_return_block_ordered_preconditioner_vector(const DoubleVector &w, DoubleVector &v) const
Takes the block ordered vector, w, and reorders it in the natural order. Reordered vector is returned...
void clear()
clears the distribution
void internal_return_block_vector(const unsigned &n, const DoubleVector &b, DoubleVector &v) const
Takes the n-th block ordered vector, b, and copies its entries to the appropriate entries in the natu...
int * row_start()
Access to C-style row_start array.
Definition: matrices.h:1050
void get_block_vector(const unsigned &n, const DoubleVector &v, DoubleVector &b) const
Takes the naturally ordered vector, v and returns the n-th block vector, b. Here n is the block numbe...
void internal_get_block_ordered_preconditioner_vector(const DoubleVector &v, DoubleVector &w) const
Given the naturally ordered vector, v, return the vector rearranged in block order in w...
A class for compressed row matrices. This is a distributable object.
Definition: matrices.h:872
void internal_get_block_vectors(const Vector< unsigned > &block_vec_number, const DoubleVector &v, Vector< DoubleVector > &s) const
Takes the naturally ordered vector and rearranges it into a vector of sub vectors corresponding to th...
void build(const LinearAlgebraDistribution *distribution_pt, const unsigned &ncol, const Vector< double > &value, const Vector< int > &column_index, const Vector< int > &row_start)
build method: vector of values, vector of column indices, vector of row starts and number of rows and...
Definition: matrices.cc:1692
void internal_get_block(const unsigned &i, const unsigned &j, MATRIX &output_block) const
Gets block (i,j) from the matrix pointed to by Matrix_pt and returns it in output_block. This is associated with the internal blocks. Please use the other get_block(...) function.
void build_without_copy(const unsigned &ncol, const unsigned &nnz, double *value, int *column_index, int *row_start)
keeps the existing distribution and just matrix that is stored without copying the matrix data ...
Definition: matrices.cc:1731