Disk ARchive  2.7.14
Full featured and portable backup and archiving tool
parallel_tronconneuse.hpp
Go to the documentation of this file.
1 /*********************************************************************/
2 // dar - disk archive - a backup/restoration program
3 // Copyright (C) 2002-2024 Denis Corbin
4 //
5 // This program is free software; you can redistribute it and/or
6 // modify it under the terms of the GNU General Public License
7 // as published by the Free Software Foundation; either version 2
8 // of the License, or (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU General Public License
16 // along with this program; if not, write to the Free Software
17 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 //
19 // to contact the author, see the AUTHOR file
20 /*********************************************************************/
21 
25 
34 
35 #ifndef PARALLEL_TRONCONNEUSE_HPP
36 #define PARALLEL_TRONCONNEUSE_HPP
37 
38 #include "../my_config.h"
39 #include <string>
40 
41 #include "infinint.hpp"
42 #include "archive_version.hpp"
43 #include "crypto_segment.hpp"
44 #include "heap.hpp"
45 #include "crypto_module.hpp"
46 #include "proto_tronco.hpp"
47 
48 #include <libthreadar/libthreadar.hpp>
49 
50 namespace libdar
51 {
52 
55 
56  // those class are used by the parallel_tronconneuse class to wrap the different
57  // type of threads. They are defined just after the parallel_tronconneuse definition
58  class read_below;
59  class write_below;
60  class crypto_worker;
61 
63 
64  enum class tronco_flags { normal = 0, stop = 1, eof = 2, die = 3, data_error = 4, exception_below = 5, exception_worker = 6, exception_error = 7 };
65 
66 
68  //
69  // the parallel_tronconneuse class that orchestrate all that
70  //
71  //
72 
73 
75 
87 
88  class parallel_tronconneuse : public proto_tronco
89  {
90  public:
92 
100  parallel_tronconneuse(U_I workers,
101  U_32 block_size,
102  generic_file & encrypted_side,
104  std::unique_ptr<crypto_module> & ptr);
105 
107  parallel_tronconneuse(const parallel_tronconneuse & ref) = delete;
108 
111 
114 
116  parallel_tronconneuse & operator = (parallel_tronconneuse && ref) noexcept = default;
117 
119  ~parallel_tronconneuse() noexcept;
120 
122  virtual bool skippable(skippability direction, const infinint & amount) override;
124  virtual bool skip(const infinint & pos) override;
126  virtual bool skip_to_eof() override;
128  virtual bool skip_relative(S_I x) override;
130  virtual bool truncatable(const infinint & pos) const override { return false; };
132  virtual infinint get_position() const override { if(is_terminated()) throw SRC_BUG; return current_position; };
133 
135 
140  virtual void write_end_of_file() override { if(is_terminated()) throw SRC_BUG; sync_write(); };
141 
142 
144 
145  virtual void set_initial_shift(const infinint & x) override;
146 
150  virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override;
151 
153  virtual U_32 get_clear_block_size() const override { return clear_block_size; };
154 
155  private:
156 
157  // inherited from generic_file
158 
160  virtual void inherited_read_ahead(const infinint & amount) override;
161 
163  virtual U_I inherited_read(char *a, U_I size) override;
164 
166 
168  virtual void inherited_write(const char *a, U_I size) override;
169 
171 
174  virtual void inherited_truncate(const infinint & pos) override { throw SRC_BUG; };
175 
177  virtual void inherited_sync_write() override;
178 
179 
181  virtual void inherited_flush_read() override;
182 
184  virtual void inherited_terminate() override;
185 
186  const archive_version & get_reading_version() const { return reading_ver; };
187 
188  // internal data structure
189  enum class thread_status { running, suspended, dead };
190 
191  // the fields
192 
198  std::unique_ptr<crypto_module> crypto;
199  infinint (*mycallback)(generic_file & below, const archive_version & reading_ver);
200  generic_file* encrypted;
201 
202  // fields used to represent possible status of subthreads and communication channel (the pipe)
203 
205  thread_status t_status;
206 
207 
208  // the following stores data from the ratelier_gather to be provided for read() operation
209  // the lus_data/lus_flags is what is extracted from the ratelier_gather, both constitute
210  // the feedback channel from sub-threads to provide order acks and normal data
211 
212  std::deque<std::unique_ptr<crypto_segment> > lus_data;
213  std::deque<signed int> lus_flags;
214  bool lus_eof;
216 
217  // the following stores data going to ratelier_scatter for the write() operation
218 
219  std::unique_ptr<crypto_segment> tempo_write;
220  infinint block_num;
221 
222  // the datastructures shared among threads
223 
224  std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > scatter;
225  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > gather;
226  std::shared_ptr<libthreadar::barrier> waiter;
227  std::shared_ptr<heap<crypto_segment> > tas;
228 
229  // the child threads
230 
231  std::deque<std::unique_ptr<crypto_worker> > travailleur;
232  std::unique_ptr<read_below> crypto_reader;
233  std::unique_ptr<write_below> crypto_writer;
234 
235 
236 
238 
249  bool send_read_order(tronco_flags order, const infinint & for_offset = 0);
250 
252  void send_write_order(tronco_flags order);
253 
255  void go_read();
256 
258  void read_refill();
259 
261 
268 
270 
279  bool purge_unack_stop_order(const infinint & pos = 0);
280 
282 
291  bool find_offset_in_lus_data(const infinint & pos);
292 
294  void run_threads();
295 
297  void stop_threads();
298 
300  void join_workers_only();
301 
303  void join_threads();
304 
305 
306  static U_I get_ratelier_size(U_I num_worker) { return num_worker + num_worker/2; };
307  static U_I get_heap_size(U_I num_worker);
308  };
309 
310 
312  //
313  // read_below subthread used by parallel_tronconneuse
314  // to dispatch chunk of encrypted data to the workers
315  //
316 
317  class read_below: public libthreadar::thread
318  {
319  public:
320  read_below(const std::shared_ptr<libthreadar::ratelier_scatter<crypto_segment> > & to_workers,
321  const std::shared_ptr<libthreadar::barrier> & waiter,
322  U_I num_workers,
323  U_I clear_block_size,
324  generic_file* encrypted_side,
325  const std::shared_ptr<heap<crypto_segment> > xtas,
326  infinint init_shift):
327  workers(to_workers),
328  waiting(waiter),
329  num_w(num_workers),
330  clear_buf_size(clear_block_size),
331  encrypted(encrypted_side),
332  tas(xtas),
333  initial_shift(init_shift),
334  reof(false),
335  trailing_clear_data(nullptr)
336  { flag = tronco_flags::normal; };
337 
338  ~read_below() { if(ptr) tas->put(move(ptr)); kill(); join(); };
339 
343  void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) { trailing_clear_data = call_back; };
344 
345  // *** //
346  // *** the method above should not be used anymore once the thread is running *** //
347  // *** //
348 
350  void set_initial_shift(const infinint & x) { initial_shift = x; };
351 
353 
358  void set_pos(const infinint & pos) { skip_to = pos; };
359 
361 
370  void set_flag(tronco_flags val) { flag = val; };
371 
373 
378  const infinint & get_clear_flow_start() const { return clear_flow_start; };
379 
381 
386  const infinint & get_pos_in_flow() const { return pos_in_flow; };
387 
388 
389  protected:
390  virtual void inherited_run() override;
391 
392  private:
393  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > workers;
394  std::shared_ptr<libthreadar::barrier> waiting;
395  U_I num_w;
396  U_I clear_buf_size;
397  generic_file* encrypted;
398  archive_version version;
399  std::shared_ptr<heap<crypto_segment> > tas;
400  infinint initial_shift;
401  bool reof;
402  trailing_clear_data_callback trailing_clear_data;
403  std::unique_ptr<crypto_segment> ptr;
404  infinint index_num;
405 
406 
407  // initialized by inherited_run() / get_ready_for_new_offset()
408 
409  infinint crypt_offset;
410  U_I encrypted_buf_size;
411 
412  // fields accessible by both the caller and the read_below thread
413 
414  infinint skip_to;
415  tronco_flags flag;
416  infinint clear_flow_start;
417  infinint pos_in_flow;
418 
419  void work();
420  infinint get_ready_for_new_offset();
421  void send_flag_to_workers(tronco_flags theflag);
422 
423  // same function as the tronconneuse::position_clear2crypt
424  void position_clear2crypt(const infinint & pos,
425  infinint & file_buf_start,
426  infinint & clear_buf_start,
427  infinint & pos_in_buf,
428  infinint & block_num);
429 
430  };
431 
432 
434  //
435  // write_below subthread used by parallel_tronconneuse
436  // to gather and write down encrypted data work from workers
437  //
438 
439  class write_below: public libthreadar::thread
440  {
441  public:
442  write_below(const std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > & from_workers,
443  const std::shared_ptr<libthreadar::barrier> & waiter,
444  U_I num_workers,
445  generic_file* encrypted_side,
446  const std::shared_ptr<heap<crypto_segment> > xtas):
447  workers(from_workers),
448  waiting(waiter),
449  num_w(num_workers),
450  cur_num_w(0),
451  encrypted(encrypted_side),
452  tas(xtas),
453  error(false),
454  error_block(0)
455  { if(encrypted == nullptr) throw SRC_BUG; };
456 
457  ~write_below() { kill(); join(); };
458 
459  bool exception_pending() const { return error; };
460  const infinint & get_error_block() const { return error_block; };
461 
462  protected:
463  virtual void inherited_run() override;
464 
465  private:
466  std::shared_ptr<libthreadar::ratelier_gather<crypto_segment> > workers;
467  std::shared_ptr<libthreadar::barrier> waiting;
468  U_I num_w;
469  U_I cur_num_w;
470  generic_file* encrypted;
471  std::shared_ptr<heap<crypto_segment> > tas;
472  bool error;
473  infinint error_block; // last crypto block before error
474  std::deque<std::unique_ptr<crypto_segment> >ones;
475  std::deque<signed int> flags;
476 
477  void work();
478  };
479 
480 
482  //
483  // the crypto_worker threads performing ciphering/deciphering
484  // of many data blocks in parallel
485  //
486 
487 
488  class crypto_worker: public libthreadar::thread
489  {
490  public:
491  crypto_worker(std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & read_side,
492  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & write_side,
493  std::shared_ptr<libthreadar::barrier> waiter,
494  std::unique_ptr<crypto_module> && ptr,
495  bool encrypt):
496  reader(read_side),
497  writer(write_side),
498  waiting(waiter),
499  crypto(move(ptr)),
500  do_encrypt(encrypt),
501  abort(status::fine)
502  { if(!reader || !writer || !waiting || !crypto) throw SRC_BUG; };
503 
504  virtual ~crypto_worker() { kill(); join(); };
505 
506  protected:
507  virtual void inherited_run() override;
508 
509  private:
510  enum class status { fine, inform, sent };
511 
512  std::shared_ptr<libthreadar::ratelier_scatter <crypto_segment> > & reader;
513  std::shared_ptr<libthreadar::ratelier_gather <crypto_segment> > & writer;
514  std::shared_ptr<libthreadar::barrier> waiting;
515  std::unique_ptr<crypto_module> crypto;
516  bool do_encrypt; // if false do decrypt
517  std::unique_ptr<crypto_segment> ptr;
518  unsigned int slot;
519  status abort;
520 
521  void work();
522  };
523 
524 
526 
527 } // end of namespace
528 
529 #endif
virtual void write_end_of_file() override
in write_only mode indicate that end of file is reached
defines unit block of information ciphered as once
virtual bool skip(const infinint &pos) override
inherited from generic_file
parallel_tronconneuse & operator=(const parallel_tronconneuse &ref)=delete
assignment operator
thread_status t_status
wehther child thread are waiting us on the barrier
virtual U_I inherited_read(char *a, U_I size) override
this protected inherited method is now private for inherited classes of tronconneuse ...
bool purge_unack_stop_order(const infinint &pos=0)
removing the ignore_stop_acks pending on the pipe
virtual void inherited_write(const char *a, U_I size) override
inherited from generic_file
virtual void inherited_truncate(const infinint &pos) override
this prorected inherited method is now private for inherited classed of tronconneuse ...
void join_workers_only()
call by join_threads() below just code simplification around exception handling
virtual void inherited_sync_write() override
this protected inherited method is now private for inherited classes of tronconneuse ...
virtual bool skippable(skippability direction, const infinint &amount) override
inherited from generic_file
void join_threads()
wait for threads to finish and eventually rethrow their exceptions in current thread ...
bool send_read_order(tronco_flags order, const infinint &for_offset=0)
send and order to subthreads and gather acks from them
virtual bool truncatable(const infinint &pos) const override
inherited from generic_file
bool is_terminated() const
virtual bool skip_to_eof() override
inherited from generic_file
virtual infinint get_position() const override
inherited from generic_file
virtual void inherited_read_ahead(const infinint &amount) override
this protected inherited method is now private for inherited classes of tronconneuse ...
U_32 clear_block_size
size of a clear block
U_I num_workers
number of worker threads
tronco_flags purge_ratelier_from_next_order(infinint pos=0)
purge the ratelier from the next order which is provided as returned value
~parallel_tronconneuse() noexcept
destructor
virtual bool skip_relative(S_I x) override
inherited from generic_file
infinint(* trailing_clear_data_callback)(generic_file &below, const archive_version &reading_ver)
the trailing_clear_data_callback call back is a mean by which the upper layer cat tell when encrypted...
this is a partial implementation of the generic_file interface to cypher/decypher data block by block...
bool check_bytes_to_skip
whether to check for bytes to skip
void run_threads()
reset the interthread datastructure and launch the threads
U_I ignore_stop_acks
how much stop ack still to be read (aborted stop order context)
virtual void inherited_terminate() override
this protected inherited method is now private for inherited classes of tronconneuse ...
heap data structure (relying on FIFO)
std::unique_ptr< crypto_module > crypto
the crypto module use to cipher / uncipher block of data
infinint current_position
current position for the upper layer perspective (modified by skip*, inherited_read/write, find_offset_in_lus_data)
void go_read()
wake up threads in read mode when necessary
switch module to limitint (32 ou 64 bits integers) or infinint
virtual U_32 get_clear_block_size() const override
returns the block size given to constructor
void stop_threads()
end threads taking into account the fact they may be suspended on the barrier
per block cryptography implementationused for strong encryption.
this is the interface class from which all other data transfer classes inherit
parallel_tronconneuse(U_I workers, U_32 block_size, generic_file &encrypted_side, const archive_version &reading_ver, std::unique_ptr< crypto_module > &ptr)
This is the constructor.
virtual void inherited_flush_read() override
this protected inherited method is now private for inherited classes of tronconneuse ...
class archive_version that rules which archive format to follow
defines common interface for tronconneuse and parallel_tronconneuse
void sync_write()
write any pending data
bool find_offset_in_lus_data(const infinint &pos)
flush lus_data/lus_flags up to requested pos offset to be found or all data has been removed ...
the arbitrary large positive integer class
class archive_version manages the version of the archive format
void send_write_order(tronco_flags order)
send order in write mode
virtual void set_initial_shift(const infinint &x) override
this method to modify the initial shift. This overrides the constructor "no_initial_shift" of the con...
void read_refill()
fill lus_data/lus_flags from ratelier_gather if these are empty
virtual void set_callback_trailing_clear_data(trailing_clear_data_callback call_back) override
libdar namespace encapsulate all libdar symbols
Definition: archive.hpp:46
tronco_flags
status flags used between parallel_tronconneuse and its sub-threads
archive_version reading_ver
archive format we follow
infinint initial_shift
the offset in the "encrypted" below layer at which starts the encrypted data