ArnLib  4.0.x
Active Registry Network
ArnSync.cpp
Go to the documentation of this file.
1 // Copyright (C) 2010-2022 Michael Wiklund.
2 // All rights reserved.
3 // Contact: arnlib@wiklunden.se
4 //
5 // This file is part of the ArnLib - Active Registry Network.
6 // Parts of ArnLib depend on Qt and/or other libraries that have their own
7 // licenses. Usage of these other libraries is subject to their respective
8 // license agreements.
9 //
10 // GNU Lesser General Public License Usage
11 // This file may be used under the terms of the GNU Lesser General Public
12 // License version 2.1 as published by the Free Software Foundation and
13 // appearing in the file LICENSE_LGPL.txt included in the packaging of this
14 // file. In addition, as a special exception, you may use the rights described
15 // in the Nokia Qt LGPL Exception version 1.1, included in the file
16 // LGPL_EXCEPTION.txt in this package.
17 //
18 // GNU General Public License Usage
19 // Alternatively, this file may be used under the terms of the GNU General Public
20 // License version 3.0 as published by the Free Software Foundation and appearing
21 // in the file LICENSE_GPL.txt included in the packaging of this file.
22 //
23 // Other Usage
24 // Alternatively, this file may be used in accordance with the terms and conditions
25 // contained in a signed written agreement between you and Michael Wiklund.
26 //
27 // This program is distributed in the hope that it will be useful, but WITHOUT ANY
28 // WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
29 // PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
30 //
31 
32 #include "ArnSync.hpp"
33 #include "ArnSyncLogin.hpp"
34 #include "ArnItemNet.hpp"
35 #include "ArnLink.hpp"
36 #include "ArnInc/ArnClient.hpp"
37 #include "ArnInc/ArnMonEvent.hpp"
38 #include "ArnInc/ArnEvent.hpp"
39 #include "ArnInc/Arn.hpp"
40 #include "ArnInc/ArnLib.hpp"
41 #include "ArnInc/ArnCompat.hpp"
42 #include <QTcpSocket>
43 #include <QString>
44 #include <QStringList>
45 #include <QDebug>
46 #include <limits.h>
47 
48 #define ARNSYNCVER "4.0"
49 
50 using Arn::XStringMap;
51 
52 
53 ArnSync::ArnSync( QTcpSocket *socket, bool isClientSide, QObject *parent)
54  : QObject( parent)
55 {
56  _socket = socket; // Note: ArnSync does not own socket ...
57  _sessionHandler = arnNullptr;
58  _toRemotePathCB = &nullConvertPath;
59  _arnLogin = arnNullptr;
60  _isClientSide = isClientSide;
61  _state = State::Init;
62  _isSending = false;
63  _isClosed = isClientSide; // Server start as not closed
64  _queueNumCount = 0;
65  _queueNumDone = 0;
66  _isConnectStarted = !isClientSide; // Server start as connection started
67  _isConnected = false;
68  _isDemandLogin = false;
69  _remoteVer[0] = 1; // Default version 1.0
70  _remoteVer[1] = 0;
71  _loginReqCode = 0;
72  _loginNextSeq = 0;
73  _loginSalt1 = 0;
74  _loginSalt2 = 0;
75  _trafficIn = 0;
76  _trafficOut = 0;
77  _clientSyncMode = Arn::ClientSyncMode::Invalid;
78  _allow = _isClientSide ? Arn::Allow::All : Arn::Allow::None;
79  _remoteAllow = Arn::Allow::None;
80  _freePathTab += Arn::fullPath( Arn::pathLocalSys + "Legal/");
81  _dataRemain.clear();
82 }
83 
84 
85 ArnSync::~ArnSync()
86 {
87  qDeleteAll( _itemNetMap);
88  qDeleteAll( _fluxRecPool);
89  qDeleteAll( _fluxPipeQueue);
90 }
91 
92 
93 void ArnSync::setArnLogin( ArnSyncLogin* arnLogin)
94 {
95  _arnLogin = arnLogin;
96 }
97 
98 
99 void ArnSync::start()
100 {
101  connect( _socket, SIGNAL(disconnected()), this, SLOT(disConnected()));
102  connect( _socket, SIGNAL(readyRead()), this, SLOT(socketInput()));
103  connect( _socket, SIGNAL(bytesWritten(qint64)), this, SLOT(sendNext()));
104  connect( &_loginDelayTimer, SIGNAL(timeout()), this, SLOT(doLoginSeq0End()));
105 
106  if (_isClientSide) {
107  _isConnected = false;
108  _remoteAllow = Arn::Allow::All; // No restrictions until known server permisions
109  }
110  else {
111  _isConnected = true;
112  if (_isDemandLogin) {
113  _allow = Arn::Allow::None;
114  _remoteAllow = Arn::Allow::None;
115  }
116  else {
117  _allow = Arn::Allow::All;
118  _remoteAllow = Arn::Allow::All;
119  setState( State::Normal);
120  }
121  }
122 }
123 
124 
125 void ArnSync::startNormalSync()
126 {
127  if (_state == State::Normal) return; // Already in normal state (has done sync)
128 
129  // qDebug() << "StartNormalSync:";
130  clearNonPipeQueues();
131 
133  ArnItemNet* itemNet;
134  QByteArray mode;
135  QMapIterator<uint,ArnItemNet*> i( _itemNetMap);
136  while (i.hasNext()) {
137  i.next();
138  itemNet = i.value();
139 
140  itemNet->resetDirtyValue();
141  itemNet->resetDirtyMode();
142 
143  _syncQueue.enqueue( itemNet);
144  mode = itemNet->getModeString();
145  // If non default mode that isn't already in modeQueue
146  if (!mode.isEmpty() && !itemNet->isDirtyMode()) {
147  _modeQueue.enqueue( itemNet);
148  }
149 
150  bool isMaster = itemNet->isMaster();
151  bool isNull = itemNet->type() == Arn::DataType::Null;
152  bool isIniMaster = false;
153  bool isIniSlave = false;
154 
155  switch (_clientSyncMode) {
157  {
158  if (_remoteVer[0] < 3)
159  break;
160  isIniMaster = itemNet->localUpdateSinceStop() > 0;
161  isIniSlave = isMaster && isNull;
162  break;
163  }
165  if (!isMaster) { // Already is Master
166  if (itemNet->localUpdateCount() > 0) {
167  itemNet->setMaster();
168  }
169  }
170  if (_remoteVer[0] < 3)
171  break;
172  isIniSlave = isMaster && isNull && itemNet->isSaveMode();
173  break;
175  if (_remoteVer[0] < 3)
176  break;
177  isIniSlave = isMaster && isNull && itemNet->isSaveMode();
178  break;
179  default:
180  break;
181  }
182 
183  itemNet->setIniMaster( isIniMaster);
184  itemNet->setIniSlave( isIniSlave);
185  bool isMasterStart = itemNet->isMasterAtStart();
186  bool isValueBlocked = itemNet->isPipeMode() || // Dont Sync itemNet value to Pipe
187  itemNet->isFolder();
188  if (isMasterStart && !isValueBlocked) {
189  itemNet->resetEchoSeq();
190  itemNet->setSyncFlux( true);
191  itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet); // Make client send the current value to server
192  }
193  }
194  setState( State::Normal);
195 
196  sendNext();
197 }
198 
199 
200 void ArnSync::sendXSMap( const XStringMap& xsMap)
201 {
202  send( xsMap.toXString());
203 }
204 
205 
206 void ArnSync::send( const QByteArray& xString)
207 {
208  if (!_isConnected) {
209  return;
210  }
211 
212  QByteArray sendString;
213  sendString += xString;
214  if (Arn::debugRecInOut) qDebug() << "Rec-Out: " << sendString;
215  sendString += "\r\n";
216  _socket->write( sendString);
217  _trafficOut += quint32( sendString.size());
218 }
219 
220 
221 void ArnSync::sendNoSync( const QString& path)
222 {
223  XStringMap xm;
224  xm.add(ARNRECNAME, "nosync").add("path", path);
225 
226  // qDebug() << "ArnSync-nosync: path=" << path;
227  sendXSMap( xm);
228 }
229 
230 
231 void ArnSync::sendSetTree( const QString& path)
232 {
233  if (!_remoteAllow.is( _allow.Write)) return;
234 
235  XStringMap xm;
236  xm.add(ARNRECNAME, "set").add("path", path);
237 
238  // qDebug() << "ArnSync-set-tree: path=" << path;
239  sendXSMap( xm);
240 }
241 
242 
243 void ArnSync::sendDelete( const QString& path)
244 {
245  if (!_remoteAllow.is( _allow.Delete)) return;
246 
247  XStringMap xm;
248  xm.add(ARNRECNAME, "delete").add("path", path);
249 
250  // qDebug() << "ArnSync-delete: path=" << path;
251  sendXSMap( xm);
252 }
253 
254 
255 void ArnSync::sendInfo( int type, const QByteArray& data)
256 {
257  _commandMap.clear();
258  _commandMap.add(ARNRECNAME, "info").add("type", QByteArray::number( type));
259  _commandMap.add("data", data);
260 
261  sendXSMap( _commandMap);
262 }
263 
264 
265 void ArnSync::sendMessage( int type, const QByteArray& data)
266 {
267  _commandMap.clear();
268  _commandMap.add(ARNRECNAME, "message").add("type", QByteArray::number( type));
269  _commandMap.add("data", data);
270 
271  sendXSMap( _commandMap);
272 }
273 
274 
275 void ArnSync::sendLogin( int seq, const Arn::XStringMap& xsMap)
276 {
277  XStringMap xm;
278  xm.add(ARNRECNAME, "login").add("seq", QByteArray::number( seq));
279  xm.add( xsMap);
280 
281  // qDebug() << "ArnSync-login: xs=" << xm.toXString();
282  sendXSMap( xm);
283 }
284 
285 
286 void ArnSync::sendExit()
287 {
288  XStringMap xm;
289  xm.add(ARNRECNAME, "exit");
290 
291  sendXSMap( xm);
292 }
293 
294 
295 uint ArnSync::remoteVer(uint index)
296 {
297  if (index >= 2) return 0; // Out of bound
298 
299  return _remoteVer[ index];
300 }
301 
302 
304 void ArnSync::setupItemNet( ArnItemNet* itemNet, uint netId)
305 {
306  itemNet->setNetId( netId);
307  _itemNetMap.insert( netId, itemNet);
308 
309  itemNet->setEventHandler( this);
310 
311 #if 0
312  if (itemNet->path() == "/") {
313  qDebug() << "ArnSync setupItemNet root: eventH=" << itemNet->eventHandler()
314  << " sessionH=" << itemNet->sessionHandler()
315  << " isMon=" << itemNet->isMonitor() << " netId=" << itemNet->netId()
316  << " itemNet=" << itemNet;
317  }
318 #endif
319 }
320 
321 
322 void ArnSync::itemValueUpdater( const ArnLinkHandle& handleData, const QByteArray* valueData, ArnItemNet* itemNet)
323 {
324  if (!itemNet) return;
325 
326  if (itemNet->isLeadValueUpdate())
327  addToFluxQue( handleData, valueData, itemNet);
328 }
329 
330 
331 void ArnSync::itemModeUpdater( ArnItemNet* itemNet)
332 {
333  if (!itemNet) return;
334 
335  if (itemNet->isLeadModeUpdate())
336  addToModeQue( itemNet);
337 }
338 
339 
341 ArnItemNet* ArnSync::newNetItem( const QString& path,
342  Arn::ObjectSyncMode syncMode, bool* isNewPtr)
343 {
344  if (!_remoteAllow.isAny( _allow.ReadWrite)) {
345  QString remotePath = (*_toRemotePathCB)( _sessionHandler, path);
346  if (!isFreePath( remotePath)) {
347  ArnM::errorLog( QString(tr("Share ArnObject: path=")) +
348  path + " remoteAllow=" + _remoteAllow.toString() +
349  " (" + QString::number(_remoteAllow.toInt()) + ")",
351  return arnNullptr;
352  }
353  }
354 
355  ArnItemNet* itemNet = new ArnItemNet( _sessionHandler);
356  if (!itemNet->open( path)) {
357  delete itemNet;
358  return arnNullptr;
359  }
360 
361  uint netId = itemNet->linkId(); // Use clients linkId as netID for this Item
362  if (_itemNetMap.contains( netId)) { // Item is already synced by this client
363  if (isNewPtr) { // Allow duplicate ref, indicate this is not new
364  delete itemNet;
365  itemNet = _itemNetMap.value( netId, arnNullptr);
366  itemNet->addSyncMode( syncMode, true);
367  *isNewPtr = false;
368  return itemNet;
369  }
370  else { // Not allow duplicate ref, return error;
371  qDebug() << "Arn netSync Item already synced: path=" << itemNet->path();
372  delete itemNet;
373  return arnNullptr;
374  }
375  }
376  if (isNewPtr)
377  *isNewPtr = true;
378 
379  itemNet->addSyncMode( syncMode, true);
380  setupItemNet( itemNet, netId);
381  itemNet->setBlockEcho( true); // Client gives no echo to avoid endless looping
382 
383  if (_isClosed) return itemNet;
384 
385  _syncQueue.enqueue( itemNet);
386  // qDebug() << "Sync EnQueue: id=" << itemNet->netId() << " path=" << itemNet->path();
387 
388  if (!_isSending)
389  sendNext();
390 
391  return itemNet;
392 }
393 
394 
395 void ArnSync::setClientSyncMode( Arn::ClientSyncMode clientSyncMode)
396 {
397  _clientSyncMode = clientSyncMode;
398 }
399 
400 
401 void ArnSync::setSessionHandler( void* sessionHandler)
402 {
403  _sessionHandler = sessionHandler;
404 }
405 
406 
407 void ArnSync::setToRemotePathCB( ArnSync::ConVertPathCB toRemotePathCB)
408 {
409  _toRemotePathCB = toRemotePathCB;
410 }
411 
412 
413 QString ArnSync::nullConvertPath(void* context, const QString& path)
414 {
415  Q_UNUSED(context)
416 
417  return path;
418 }
419 
420 
421 void ArnSync::setWhoIAm( const QByteArray& whoIAm)
422 {
423  _whoIAm = whoIAm;
424 }
425 
426 
427 QByteArray ArnSync::remoteWhoIAm() const
428 {
429  return _remoteWhoIAm;
430 }
431 
432 
433 QString ArnSync::loginUserName() const
434 {
435  return _loginUserName;
436 }
437 
438 
439 Arn::Allow ArnSync::getAllow() const
440 {
441  return _allow;
442 }
443 
444 
445 void ArnSync::close()
446 {
447  _isConnectStarted = false;
448 
449  if (_isClosed) return;
450 
451  _isClosed = true;
452  if (!_isSending)
453  sendNext();
454  if (!_isConnected)
455  clearAllQueues();
456 }
457 
458 
459 void ArnSync::closeFinal()
460 {
461  sendExit();
462  _socket->disconnectFromHost();
463  _isConnected = false;
464 }
465 
466 
467 void ArnSync::clearNonPipeQueues()
468 {
469  _syncQueue.clear();
470  _modeQueue.clear();
471  _fluxItemQueue.clear();
472 }
473 
474 
475 void ArnSync::clearAllQueues()
476 {
477  clearNonPipeQueues();
479  _fluxRecPool += _fluxPipeQueue;
480  _fluxPipeQueue.clear();
481 }
482 
483 
484 void ArnSync::setRemoteVer( const QByteArray& remVer)
485 {
486  if (remVer.isEmpty()) return;
487 
488  _remoteVer[1] = 0; // Default
489  QList<QByteArray> remVerParts = remVer.split('.');
490  int partsNum = qMin( remVerParts.size(), 2);
491  for (int i = 0; i < partsNum; ++i) {
492  _remoteVer[i] = remVerParts.at(i).toUInt();
493  }
494 
495  XStringMap::Options xop;
496  if (_remoteVer[0] >= 4) {
497  xop = xop.NullTilde | xop.RepeatLen | xop.Frame;
498  }
499  _commandMap.setOptions( xop);
500  _replyMap.setOptions( xop);
501  _syncMap.setOptions( xop);
502 }
503 
504 
505 void ArnSync::setState( ArnSync::State state)
506 {
507  if (state == _state) return; // State already set
508 
509  _state = state;
510  emit stateChanged( _state);
511 }
512 
513 
514 bool ArnSync::isFreePath( const QString& path) const
515 {
516  foreach (const QString& freePath, _freePathTab) {
517  if (path.startsWith( freePath)) return true;
518  }
519  return false;
520 }
521 
522 
523 void ArnSync::socketInput()
524 {
525  _dataReadBuf.resize( int(_socket->bytesAvailable()));
526  int nbytes = int(_socket->read( _dataReadBuf.data(), qint64(_dataReadBuf.size())));
527  if (nbytes <= 0) return; // No bytes / error
528  if (_isClosed) return;
529 
530  _dataReadBuf.resize( nbytes);
531  _dataRemain += _dataReadBuf;
532  _trafficIn += uint( nbytes);
533 
534  QByteArray xString;
535  int pos;
536  while ((pos = _dataRemain.indexOf("\n")) >= 0) {
537  // Set xString to string before \n
538  xString.resize(0);
539  xString.append( _dataRemain.constData(), pos);
540  _dataRemain.remove(0, pos + 1); // Set remain to string after \n
541 
542  xString.replace('\r', ""); // Remove any \r
543  _commandMap.fromXString( xString); // Load command Map
544  _replyMap.clear(); // Reset reply Map
545 
546  if (Arn::debugRecInOut) qDebug() << "Rec-in: " << xString;
547  doCommands();
548 
549  if (_replyMap.size()) {
550  sendXSMap( _replyMap);
551  // _replySendingCount++;
552  // cout << "REPLY: |" << _replyMap.toXString() << "|" << endl;
553  }
554  }
555 }
556 
557 
558 void ArnSync::doCommands()
559 {
560  uint stat = ArnError::Ok;
561  QByteArray command = _commandMap.value(0);
562 
564  if (command == "flux") {
565  stat = doCommandFlux();
566  }
567  else if (command == "atomop") {
568  stat = doCommandAtomOp();
569  }
570  else if (command == "event") {
571  stat = doCommandEvent();
572  }
573  else if (command == "get") {
574  stat = doCommandGet();
575  }
576  else if (command == "set") {
577  stat = doCommandSet();
578  }
579  else if (command == "sync") {
580  stat = doCommandSync();
581  }
582  else if (command == "mode") {
583  stat = doCommandMode();
584  }
585  else if (command == "nosync") {
586  stat = doCommandNoSync();
587  }
588  else if (command == "destroy") { // Legacy: Obsolete, will be phased out
589  stat = doCommandDelete();
590  }
591  else if (command == "delete") {
592  stat = doCommandDelete();
593  }
594  else if (command == "message") {
595  stat = doCommandMessage();
596  }
597  else if (command == "ls") {
598  stat = doCommandLs();
599  }
600  else if (command == "info") {
601  stat = doCommandInfo();
602  }
603  else if (command == "Rinfo") {
604  stat = doCommandRInfo();
605  }
606  else if (command == "ver") {
607  stat = doCommandVer();
608  }
609  else if (command == "Rver") {
610  stat = doCommandRVer();
611  }
612  else if (command == "login") {
613  stat = doCommandLogin();
614  }
615  else if (command == "exit") {
616  _socket->disconnectFromHost();
617  }
619  else if (command == "err") {
620  qDebug() << "REC-ERR: |" << _commandMap.toXString() << "|";
621  }
622  else if (command.startsWith('R')); // No error on unhandled R-commands
623  else {
624  stat = ArnError::RecUnknown;
625  _replyMap.add(ARNRECNAME, "err");
626  _replyMap.add("data", QByteArray("Unknown record:") + command);
627  }
628 
629  if ((_replyMap.size() == 0) && (stat != ArnError::Ok)) {
630  _replyMap.add(ARNRECNAME, "err");
631  _replyMap.add("data", QByteArray("record:") + command +
632  " errTxt:" + ArnError::txt().getTxt( int( stat)));
633  }
634 
635  if (_replyMap.size()) {
636  _replyMap.add("stat", QByteArray::number( stat));
637  }
638 
639  if (command.startsWith('R')) { // Forward all R-commands
640  emit replyRecord( _commandMap);
641  }
642 }
643 
644 
645 void ArnSync::startLogin()
646 {
648  _loginSalt1 = Arn::rand();
649  XStringMap xsm;
650  xsm.addNum("demand", _isDemandLogin).addNum("salt1", _loginSalt1);
651  sendLogin( 0, xsm);
652  _loginNextSeq = 1;
653 }
654 
655 
656 void ArnSync::loginToArn( const QString& userName, const QString& passwordHash, Arn::Allow allow)
657 {
659  _loginUserName = userName;
660  _loginPwHash = passwordHash;
661  _allow = allow;
662 
663  loginToArn();
664 }
665 
666 
667 void ArnSync::loginToArn()
668 {
670  if (_loginUserName.isEmpty()) return; // Empty username not valid, do nothing
671  if (_loginNextSeq != -2) return; // Not in correct seq to perform this login step
672 
673  QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt1, _loginSalt2, _loginPwHash);
674  XStringMap xsm;
675  xsm.add("user", _loginUserName).add("pass", pwHashX);
676  sendLogin( 2, xsm);
677  _loginNextSeq = 3;
678 }
679 
680 
681 void ArnSync::doLoginSeq0End()
682 {
684  _loginDelayTimer.stop();
685 
686  _loginSalt2 = Arn::rand();
687  XStringMap xsm;
688  xsm.add("demand", QByteArray::number( _isDemandLogin));
689  xsm.add("salt2", QByteArray::number( _loginSalt2));
690  sendLogin( 1, xsm);
691  _loginNextSeq = 2;
692 }
693 
694 
695 uint ArnSync::doCommandLogin()
696 {
697  if (_isClientSide && (_state != State::Login)) {
698  return ArnError::LoginBad; // Not acceptable command when client is not logging in
699  }
700 
701  int seq =_commandMap.value("seq").toInt();
702  if (seq && (seq != _loginNextSeq)) return ArnError::LoginBad;
703 
704  switch (seq) {
705  case 0:
706  {
707  if (_isClientSide) return ArnError::LoginBad;
708 
710  _loginSalt1 = _commandMap.value("salt1").toUInt();
711  if (_loginNextSeq == 0) { // First login try
712  doLoginSeq0End(); // Continue imediately
713  }
714  else { // Login retry
715  _loginNextSeq = -1; // Temporary invalid seq while loginDelay
716  _loginDelayTimer.start(2000); // Delayed doLoginSeq0End()
717  }
718  break;
719  }
720  case 1:
721  {
722  if (!_isClientSide) return ArnError::LoginBad;
723 
725  _loginSalt2 = _commandMap.value("salt2").toUInt();
726  bool isRemoteDemandLogin = _commandMap.value("demand").toUInt() != 0;
727  if (_isDemandLogin || isRemoteDemandLogin) {
728  _loginNextSeq = -2; // Temporary invalid seq while login is handled by application
729  _remoteAllow = Arn::Allow::None;
730  emit loginRequired( _loginReqCode);
731  }
732  else { // Login not needed
733  _remoteAllow = Arn::Allow::All;
734  startNormalSync();
735  _loginNextSeq = -1;
736  }
737  break;
738  }
739  case 2:
740  {
741  if (_isClientSide) return ArnError::LoginBad;
742 
744  QByteArray userClient = _commandMap.value("user");
745  QByteArray pwHashXClient = _commandMap.value("pass");
746  QByteArray pwHashXServer;
747  _loginUserName = QString::fromUtf8( userClient.constData(), userClient.size());
748 
749  int stat = 0;
750  _allow = Arn::Allow::None; // Deafult no access
751  const ArnSyncLogin::AccessSlot* accSlot = _arnLogin->findAccess( userClient);
752  if (accSlot) {
753  QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt1, _loginSalt2, accSlot->pwHash);
754  if (pwHashXClient == pwHashX) {
755  _allow = accSlot->allow;
756  pwHashXServer = ArnSyncLogin::pwHashXchg( _loginSalt2, _loginSalt1, accSlot->pwHash);
757  stat = 1;
758  }
759  }
760 
761  XStringMap xsm;
762  xsm.add("stat", QByteArray::number( stat));
763  xsm.add("allow", QByteArray::number( _allow.toInt()));
764  xsm.add("pass", pwHashXServer);
765  sendLogin( 3, xsm);
766  _loginNextSeq = 4;
767  break;
768  }
769  case 3:
770  {
771  if (!_isClientSide) return ArnError::LoginBad;
772 
774  int statServer = _commandMap.value("stat").toInt();
775  _remoteAllow = Arn::Allow::fromInt( _commandMap.value("allow").toInt());
776  QByteArray pwHashXServer = _commandMap.value("pass");
777 
778  QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt2, _loginSalt1, _loginPwHash);
779  int stat = 0;
780  if (!statServer)
781  _loginReqCode = 1; // Server deny, login retry
782  else if (pwHashXServer != pwHashX)
783  _loginReqCode = 2; // Client deny, server not ok
784  else
785  stat = 1; // All ok
786 
787  XStringMap xsm;
788  xsm.add("stat", QByteArray::number( stat));
789  xsm.add("allow", QByteArray::number( stat ? _allow.toInt() : 0));
790  sendLogin( 4, xsm);
791  _loginNextSeq = -1;
792 
793  if (stat) {
794  emit loginCompleted();
795  startNormalSync();
796  }
797  else {
798  startLogin();
799  }
800  break;
801  }
802  case 4:
803  {
804  if (_isClientSide) return ArnError::LoginBad;
805 
807  int stat = _commandMap.value("stat").toInt();
808  _remoteAllow = Arn::Allow::fromInt( _commandMap.value("allow").toInt());
809  _loginNextSeq = -1;
810 
811  if (stat) {
812  emit loginCompleted();
813  setState( State::Normal);
814  }
815  break;
816  }
817  default:
818  break;
819  }
820 
821  return ArnError::Ok;
822 }
823 
824 
825 void ArnSync::getTraffic( quint64& in, quint64& out) const
826 {
827  in = _trafficIn;
828  out = _trafficOut;
829 }
830 
831 
832 uint ArnSync::doCommandSync()
833 {
834  if (_isClientSide) return ArnError::RecNotExpected;
835 
836  QByteArray path = _commandMap.value("path");
837  QByteArray smode = _commandMap.value("smode");
838  uint netId = _commandMap.value("id").toUInt();
839  if (!_allow.isAny( _allow.ReadWrite) && !isFreePath( path)) return ArnError::OpNotAllowed;
840 
841  if (_itemNetMap.contains( netId)) { // Item is already synced by this server session
843  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
844  qDebug() << "ArnSync CommandSync Item already synced: path=" << itemNet->path();
845  removeItemNetRefs( itemNet);
846  delete itemNet;
847  }
848 
849  bool isCreateAllow = _allow.is( _allow.Create);
850  Arn::LinkFlags createFlag = Arn::LinkFlags::flagIf( isCreateAllow, Arn::LinkFlags::CreateAllowed);
851  ArnItemNet* itemNet = new ArnItemNet( _sessionHandler);
852  if (!itemNet->openWithFlags( path, createFlag)) {
853  delete itemNet;
854  return isCreateAllow ? ArnError::CreateError : ArnError::OpNotAllowed;
855  }
856 
857  setupItemNet( itemNet, netId);
858  itemNet->addSyncModeString( smode, false); // SyncMode is only for the item (session), not the link
859 
860  Arn::ObjectSyncMode syncMode = itemNet->syncMode();
861  if (syncMode.is( syncMode.Monitor)) {
862  setupMonitorItem( itemNet);
863  }
864  if (!itemNet->getModeString().isEmpty()) { // If non default mode
865  itemModeUpdater( itemNet); // Make server send the current mode to client
866  }
867 
868  bool isBlockedValue = ((itemNet->type() == Arn::DataType::Null) && (_remoteVer[0] < 3)) ||
869  itemNet->isPipeMode() ||
870  itemNet->isFolder();
871  if (!isBlockedValue && !(itemNet->isMasterAtStart())) {
872  // Only send non blocked Value to non startMaster
873  itemNet->setSyncFlux( true);
874  itemNet->setSaveFlux( itemNet->isSaveMode());
875  itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet); // Make server send the current value to client
876  }
877 
878  return ArnError::Ok;
879 }
880 
881 
883 void ArnSync::setupMonitorItem(ArnItemNet *itemNet)
884 {
886  itemNet->setMonitor( true);
887 
889  doChildsToEvent( itemNet);
890 }
891 
892 
893 void ArnSync::doChildsToEvent( ArnItemNet *itemNet)
894 {
896  QString path = itemNet->path();
897  QStringList childList = itemNet->childItemsMain();
898  foreach (QString childName, childList) {
899  itemNet->sendNewItemMonEvent( Arn::makePath( path, childName), true);
900  }
901 }
902 
903 
904 uint ArnSync::doCommandMode()
905 {
906  if (!_allow.is( _allow.ModeChange)) return ArnError::OpNotAllowed;
907 
908  uint netId = _commandMap.value("id").toUInt();
909  QByteArray data = _commandMap.value("data");
910 
911  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
912  if (!itemNet) {
913  return ArnError::NotFound;
914  }
915 
916  itemNet->setModeString( data);
917  return ArnError::Ok;
918 }
919 
920 
921 uint ArnSync::doCommandNoSync()
922 {
924  uint netId = _commandMap.value("id").toUInt();
925  if (netId) {
926  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
927  if (!itemNet) { // Not existing item is ok, maybe destroyed before sync
928  return ArnError::Ok;
929  }
930 
931  removeItemNetRefs( itemNet);
932  delete itemNet;
933  return ArnError::Ok;
934  }
935 
937  QString path = _commandMap.valueString("path");
938  QList<ArnItemNet*> noSyncList;
939  // qDebug() << "ArnSync-noSync: path=" << path;
940  foreach (ArnItemNet* itemNet, _itemNetMap) {
941  if (itemNet->path().startsWith( path)) {
942  noSyncList += itemNet;
943  // qDebug() << "ArnSync-noSync: Add noSyncList path=" << itemNet->path();
944  }
945  }
946  // Make NoSync from list
947  foreach (ArnItemNet* itemNet, noSyncList) {
948  removeItemNetRefs( itemNet);
949  delete itemNet;
950  }
951  return ArnError::Ok;
952 }
953 
954 
955 uint ArnSync::doCommandFlux()
956 {
957  if (!_allow.is( _allow.Write)) return ArnError::OpNotAllowed;
958 
959  uint netId = _commandMap.value("id").toUInt();
960  QByteArray type = _commandMap.value("type");
961  QByteArray nqrx = _commandMap.value("nqrx");
962  QByteArray seq = _commandMap.value("seq");
963  QByteArray data = _commandMap.value("data");
964  qint8 echoSeq = qint8(_commandMap.value("es", "-1").toInt());
965 
966  bool isSyncFlux = type.contains("I"); // After sync from server/client
967  bool isSaveFlux = type.contains("S"); // Loaded persistent value
968  bool isOnlyEcho = type.contains("E"); // After sync from server/client, later from server
969  bool isNull = type.contains("N");
970 
971  ArnLinkHandle handleData;
972  handleData.flags().set( ArnLinkHandle::Flags::FromRemote);
973  if (!nqrx.isEmpty())
974  handleData.add( ArnLinkHandle::QueueFindRegexp,
975  QVariant( ARN_RegExp( QString::fromUtf8( nqrx.constData(), nqrx.size()))));
976  if (!seq.isEmpty())
977  handleData.add( ArnLinkHandle::SeqNo,
978  QVariant( seq.toInt()));
979 
980  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
981  if (!itemNet) {
982  return ArnError::NotFound;
983  }
984 
985  bool isNullBlocked = isNull && (_clientSyncMode == Arn::ClientSyncMode::StdAutoMaster); // Only client
986  bool isEchoPipeBlocked = isOnlyEcho && itemNet->isPipeMode();
987  bool isEchoBidirBlocked = isOnlyEcho && !isSyncFlux && itemNet->isBiDirMode() && (_remoteVer[0] >= 3);
988  bool isEchoMasterBlocked = isOnlyEcho && _isClientSide && itemNet->isMaster() &&
989  (!isSaveFlux || (itemNet->type() != Arn::DataType::Null));
990  bool isEchoSeqBlocked = isOnlyEcho && _isClientSide && itemNet->isEchoSeqOld( echoSeq);
991  bool isValueBlocked = isNullBlocked || isEchoPipeBlocked || isEchoBidirBlocked ||
992  isEchoMasterBlocked || isEchoSeqBlocked;
993  if (!isValueBlocked) {
994  if (!_isClientSide)
995  itemNet->setEchoSeq( echoSeq);
996  bool isIgnoreSame = isOnlyEcho;
997  itemNet->arnImport( data, isIgnoreSame, handleData);
998  }
999  else if (_isClientSide && isNullBlocked && isSyncFlux
1000  && (itemNet->type() != Arn::DataType::Null)) {
1001  // Server only had Null, use Client non Null
1002  itemNet->setSyncFlux( true); // Part of the initial sync process
1003  itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet); // Make client send the current value to server
1004  }
1005  return ArnError::Ok;
1006 }
1007 
1008 
1009 uint ArnSync::doCommandAtomOp()
1010 {
1011  if (!_allow.is( _allow.Write)) return ArnError::OpNotAllowed;
1012 
1013  uint netId = _commandMap.value("id").toUInt();
1014  QByteArray opStr = _commandMap.value("op");
1015  QByteArray a1Str = _commandMap.value("a1");
1016  QByteArray a2Str = _commandMap.value("a2");
1017 
1018  ArnAtomicOp op = ArnAtomicOp::fromInt(
1019  ArnAtomicOp::txt().getEnumVal( opStr.constData(), ArnAtomicOp::None, ArnAtomicOp::NsCom));
1020  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1021  if (!itemNet) {
1022  // qDebug() << "doCommandAtomOp NotFound xs:" << _commandMap.toXString();
1023  return ArnError::NotFound;
1024  }
1025  if (!itemNet->isAtomicOpProvider()) // This is not a provider, just skip it
1026  return ArnError::Ok;
1027 
1028  switch (op) {
1029  case ArnAtomicOp::BitSet:
1030  itemNet->setBits( a1Str.toInt(), a2Str.toInt());
1031  break;
1032  case ArnAtomicOp::AddInt:
1033  itemNet->addValue( a1Str.toInt());
1034  break;
1035  case ArnAtomicOp::AddReal:
1036 #ifdef ARNREAL_FLOAT
1037  itemNet->addValue( a1Str.toFloat());
1038 #else
1039  itemNet->addValue( a1Str.toDouble());
1040 #endif
1041  break;
1042  default:
1043  return ArnError::Undef;
1044  }
1045 
1046  return ArnError::Ok;
1047 }
1048 
1049 
1050 uint ArnSync::doCommandEvent()
1051 {
1054 
1055  uint netId = _commandMap.value("id").toUInt();
1056  QByteArray typeStr = _commandMap.value("type");
1057  QByteArray data = _commandMap.value("data");
1058 
1059  int type = ArnMonEventType::txt().getEnumVal( typeStr.constData(),
1061  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1062  if (!itemNet) {
1063  if (type == ArnMonEventType::ItemDeleted) return ArnError::Ok; // Item already deleted
1064 
1065  // qDebug() << "doCommandEvent NotFound xs:" << _commandMap.toXString();
1066  return ArnError::NotFound;
1067  }
1068 
1069  itemNet->sendMonEvent( type, data, false);
1070  return ArnError::Ok;
1071 }
1072 
1073 
1074 uint ArnSync::doCommandSet()
1075 {
1076  if (_isClientSide) return ArnError::RecNotExpected;
1077  if (!_allow.is( _allow.Write)) return ArnError::OpNotAllowed;
1078 
1079  QByteArray path = _commandMap.value("path");
1080  QByteArray data = _commandMap.value("data");
1081 
1082  _replyMap.add(ARNRECNAME, "Rset").add("path", path);
1083 
1084  bool isCreateAllow = _allow.is( _allow.Create);
1085  Arn::LinkFlags createFlag = Arn::LinkFlags::flagIf( isCreateAllow, Arn::LinkFlags::CreateAllowed);
1086  ArnItemNet item( _sessionHandler);
1087  if (!item.openWithFlags( path, createFlag)) {
1088  return createFlag ? ArnError::CreateError : ArnError::OpNotAllowed;
1089  }
1090 
1091  if (!item.isFolder()) {
1092  ArnLinkHandle handleData;
1093  handleData.flags().set( ArnLinkHandle::Flags::FromRemote);
1094  item.arnImport( data, Arn::SameValue::Accept, handleData );
1095  }
1096 
1097  return ArnError::Ok;
1098 }
1099 
1100 
1101 uint ArnSync::doCommandGet()
1102 {
1103  if (_isClientSide) return ArnError::RecNotExpected;
1104 
1105  QByteArray path = _commandMap.value("path");
1106  if (!_allow.is( _allow.Read) && !isFreePath( path)) return ArnError::OpNotAllowed;
1107 
1108  _replyMap.add(ARNRECNAME, "Rget").add("path", path);
1109 
1110  bool isCreateAllow = _allow.is( _allow.Create);
1111  ArnItem item;
1112  if (!item.open( path)) {
1113  return isCreateAllow ? ArnError::CreateError : ArnError::OpNotAllowed;
1114  }
1115 
1116  QByteArray type;
1117  if (item.type() == Arn::DataType::Null) type += "N";
1118  if (!type.isEmpty())
1119  _replyMap.add("type", type);
1120 
1121  _replyMap.add("data", item.arnExport());
1122  return ArnError::Ok;
1123 }
1124 
1125 
1126 uint ArnSync::doCommandLs()
1127 {
1128  if (_isClientSide) return ArnError::RecNotExpected;
1129 
1130  QByteArray path = _commandMap.value("path");
1131  if (!_allow.is( _allow.Read) && !isFreePath( path)) return ArnError::OpNotAllowed;
1132 
1133  _replyMap.add(ARNRECNAME, "Rls").add("path", path);
1134 
1135  if (ArnM::isFolder( path)) {
1136  QStringList subitems = ArnM::items( path);
1137  int nItems = subitems.size();
1138 
1139  for (int i = 0; i < nItems; ++i) {
1140  _replyMap.add("item", uint(i + 1), subitems.at(i));
1141  }
1142  }
1143  else {
1144  return ArnError::NotFound;
1145  }
1146 
1147  return ArnError::Ok;
1148 }
1149 
1150 
1151 uint ArnSync::doCommandDelete()
1152 {
1153  if (!_allow.is( _allow.Delete)) return ArnError::OpNotAllowed;
1154 
1155  uint netId = _commandMap.value("id", "0").toUInt();
1156 
1157  if (netId) {
1158  ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1159  if (!itemNet) { // Not existing item is ok, maybe destroyed before this
1160  return ArnError::Ok;
1161  }
1162 
1163  itemNet->setDisable(); // Defunc the item to prevent sending destroy record (MW: problem with twin)
1164  itemNet->destroyLink();
1165  }
1166  else {
1167  QByteArray path = _commandMap.value("path");
1168  if (path.isEmpty()) return ArnError::NotFound;
1169 
1170  emit xcomDelete( path);
1171  }
1172 
1173  return ArnError::Ok;
1174 }
1175 
1176 
1177 uint ArnSync::doCommandMessage()
1178 {
1179  int type = _commandMap.value("type").toInt();
1180  QByteArray data = _commandMap.value("data");
1181 
1182  emit messageReceived( type, data);
1183 
1184  return ArnError::Ok;
1185 }
1186 
1187 
1188 uint ArnSync::doCommandInfo()
1189 {
1190  if (_isClientSide) return ArnError::RecNotExpected;
1191 
1194  int type = _commandMap.value("type").toInt();
1195  QByteArray data = _commandMap.value("data");
1196 
1197  XStringMap xmIn( data);
1198  XStringMap xmOut;
1199 
1200  switch (type) {
1202  case Arn::InfoType::Custom:
1203  xmOut.add( _customMap);
1204  break;
1206  case InfoType::FreePaths:
1207  xmOut.addValues( _freePathTab);
1208  break;
1209  case InfoType::WhoIAm:
1210  _remoteWhoIAm = data;
1211  xmOut.fromXString( _whoIAm);
1212  break;
1213  default:;
1214  // Not supported info-type, send empty data reply.
1215  // Client will ask all internal types it support. That chain shall not be broken.
1216  }
1217 
1218  emit infoReceived( type);
1219 
1220  _replyMap.add(ARNRECNAME, "Rinfo").add("type", QByteArray::number( type));
1221  _replyMap.add("data", xmOut.toXString());
1222  return ArnError::Ok;
1223 }
1224 
1225 
1226 uint ArnSync::doCommandRInfo()
1227 {
1228  if (!_isClientSide) return ArnError::RecNotExpected;
1229 
1231  if (_state == State::Info) {
1232  int type = _commandMap.value("type").toInt();
1233  QByteArray data = _commandMap.value("data");
1234 
1235  doInfoInternal( type, data);
1236 
1237  emit infoReceived( type);
1238  }
1239 
1240  return ArnError::Ok;
1241 }
1242 
1243 
1244 uint ArnSync::doCommandVer()
1245 {
1246  if (_isClientSide) return ArnError::RecNotExpected;
1247 
1249  if (_state == State::Init) {
1250  setRemoteVer( _commandMap.value("ver", "1.0")); // ver key only after version 1.0
1251  if (_remoteVer[0] >= 2)
1252  setState( State::Login);
1253  else
1254  setState( State::Normal); // Just in case, this should not be reached ...
1255  }
1256  else {
1257  setRemoteVer( _commandMap.value("ver", "")); // ver key optional, used for setting remoteVer
1258  }
1259 
1260  _replyMap.add(ARNRECNAME, "Rver").add("type", "ArnNetSync").add("ver", ARNSYNCVER);
1261  return ArnError::Ok;
1262 }
1263 
1264 
1265 uint ArnSync::doCommandRVer()
1266 {
1267  if (!_isClientSide) return ArnError::RecNotExpected;
1268 
1270  if (_state == State::Version) {
1271  setRemoteVer( _commandMap.value("ver", "1.0")); // ver key only after version 1.0
1272  if (_remoteVer[0] >= 2) {
1273  setState( State::Info);
1274  _curInfoType = InfoType::Start;
1275  doInfoInternal( InfoType::Start);
1276  }
1277  else if (!_isDemandLogin) { // Old server do not support login, Ok
1278  _remoteAllow = Arn::Allow::All;
1279  startNormalSync();
1280  }
1281  else { // Old server do not support login, Fail
1282  setState( State::Login);
1283  emit loginRequired(3);
1284  }
1285  }
1286 
1287  return ArnError::Ok;
1288 }
1289 
1290 
1291 bool ArnSync::isDemandLogin() const
1292 {
1293  return _isDemandLogin;
1294 }
1295 
1296 
1297 void ArnSync::setDemandLogin( bool isDemandLogin)
1298 {
1299  _isDemandLogin = isDemandLogin;
1300 }
1301 
1302 
1303 void ArnSync::addFreePath( const QString& path)
1304 {
1305  if (!_freePathTab.contains( path))
1306  _freePathTab += path;
1307 }
1308 
1309 
1310 QStringList ArnSync::freePaths() const
1311 {
1312  return _freePathTab;
1313 }
1314 
1315 
1316 void ArnSync::connectStarted()
1317 {
1318  if (!_isConnectStarted) { // First since last closed state
1319  _isConnectStarted = true;
1320  clearAllQueues();
1321  }
1322 }
1323 
1324 
1325 void ArnSync::connected()
1326 {
1327  if (!_isClientSide) return; // Only client side
1328 
1329  _isClosed = false;
1330  _isConnected = true;
1331  _remoteAllow = Arn::Allow::None;
1332 
1333  setState( State::Version);
1334  XStringMap xsm;
1335  xsm.add(ARNRECNAME, "ver").add("type", "ArnNetSync").add("ver", ARNSYNCVER);
1336  sendXSMap( xsm);
1337 }
1338 
1339 
1340 void ArnSync::disConnected()
1341 {
1342  _isConnected = false;
1343  _isSending = false;
1344 
1345  if (_isClientSide) { // Client
1346  if (_isClosed) {
1347  clearAllQueues();
1348  }
1349  foreach (ArnItemNet* itemNet, _itemNetMap) {
1350  itemNet->onConnectStop();
1351  }
1352  }
1353  else { // Server
1355  QList<uint> destroyList;
1356  foreach (ArnItemNet* itemNet, _itemNetMap) {
1357  if (itemNet->isAutoDestroy()) {
1358  destroyList += itemNet->netId();
1359  // qDebug() << "Server-disconnect: destroyList path=" << itemNet->path();
1360  }
1361  }
1363  foreach (uint netId, destroyList) {
1364  ArnItemNet* itemNet = _itemNetMap.value( netId, 0);
1365  if (itemNet) { // if this itemNet still exist
1366  // qDebug() << "Server-disconnect: Destroy path=" << itemNet->path();
1367  itemNet->destroyLink( true); // The itemNet will be destroyed (gblobally)
1368  }
1369  }
1370 
1371  deleteLater();
1372  }
1373 }
1374 
1375 
1376 void ArnSync::removeItemNetRefs( ArnItemNet* itemNet)
1377 {
1378  if (!itemNet) return;
1379 
1380  int s;
1381  s = _itemNetMap.remove( itemNet->netId());
1382  // qDebug() << "... remove from itemMap num=" << s;
1383  s = _syncQueue.removeAll( itemNet);
1384  // qDebug() << "... remove from syncQueue num=" << s;
1385  s = _modeQueue.removeAll( itemNet);
1386  // qDebug() << "... remove from modeQueue num=" << s;
1387  s = _fluxItemQueue.removeAll( itemNet);
1388  // qDebug() << "... remove from fluxQueue num=" << s;
1389  ++s; // Gets rid of warning
1390 }
1391 
1392 
1393 void ArnSync::doArnMonEvent( int type, const QByteArray& data, bool isLocal, ArnItemNet* itemNet)
1394 {
1395  if (!itemNet) {
1396  ArnM::errorLog( QString(tr("Can't get ArnItemNet sender for doArnEvent")),
1397  ArnError::Undef);
1398  return;
1399  }
1402 
1403 
1404  if (isLocal) { // Local events are always sent to remote side
1405  //_remoteAllow = Arn::Allow::All; // Test
1406  if (_remoteAllow.is( _allow.Read)
1407  || (isFreePath( (*_toRemotePathCB)( _sessionHandler, itemNet->path()))))
1408  {
1409  eventToFluxQue( itemNet->netId(), type, data);
1410  // Allow ok, as this item has been aproved by sync
1411  }
1412  //_remoteAllow = Arn::Allow::None;
1413  }
1414 
1415  if (type == ArnMonEventType::MonitorStart) {
1416  if (Arn::debugMonitorTest) qDebug() << "ArnMonitor-Test: monitorStart Event";
1417  // Allow ok, as this item has been aproved by sync
1418 
1419  Arn::ObjectSyncMode syncMode = itemNet->syncMode();
1420  if (isLocal && _isClientSide) { // Client Side
1421  itemNet->addSyncMode( syncMode.Monitor, true); // Will demand monitor if resynced (e.g. server restart)
1422  }
1423  else if (!isLocal && !_isClientSide && !syncMode.is( syncMode.Monitor)) { // Server side
1424  setupMonitorItem( itemNet); // Item will function as a Monitor
1425  itemNet->addSyncMode( syncMode.Monitor, false); // Indicate is Monitor, prevent duplicate setup
1426  }
1427  }
1428  if (type == ArnMonEventType::MonitorReStart) {
1429  if (Arn::debugMonitorTest) qDebug() << "ArnMonitor-Test: monitorReStart Event";
1430  // Allow ok, as this item has been aproved by sync
1431 
1432  if (!isLocal && !_isClientSide) { // Server side
1434  doChildsToEvent( itemNet);
1435  }
1436  }
1437 }
1438 
1439 
1440 void ArnSync::doInfoInternal( int infoType, const QByteArray& data)
1441 {
1442  XStringMap xmIn( data);
1443  // XStringMap xmOut; // Tobe used later
1444 
1445  if (infoType != _curInfoType) { // Not ok sequence
1446  emit loginRequired(4);
1447  return;
1448  }
1449 
1450  switch (infoType) {
1451  case InfoType::Start:
1452  _curInfoType = InfoType::FreePaths;
1453  sendInfo( InfoType::FreePaths);
1454  break;
1455  case InfoType::FreePaths:
1456  _freePathTab = xmIn.values();
1457  _curInfoType = InfoType::WhoIAm;
1458  sendInfo( InfoType::WhoIAm, _whoIAm);
1459  break;
1460  case InfoType::WhoIAm:
1461  _remoteWhoIAm = data;
1462 
1463  setState( State::Login);
1464  _loginReqCode = 0;
1465  startLogin();
1466  break;
1467  default:
1468  break;
1469  }
1470 }
1471 
1472 
1473 void ArnSync::addToFluxQue( const ArnLinkHandle& handleData, const QByteArray* valueData,
1474  ArnItemNet* itemNet)
1475 {
1476  if (!itemNet) return;
1477 
1478  if (itemNet->isPipeMode()) {
1479  if (!_isConnectStarted) return;
1480 
1481  if (itemNet->isOnlyEcho()
1482  || (itemNet->type() == Arn::DataType::Null)
1483  || (!_remoteAllow.is( _allow.Write)
1484  && (_isClientSide || !isFreePath( itemNet->path()))))
1485  {
1486  // qDebug() << "Flux skip pipe echo: path=" << itemNet->path() << " data=" << itemNet->arnExport()
1487  // << "itemId=" << itemNet->itemId();
1488  itemNet->resetDirtyValue(); // Arm for more updates
1489  return; // Don't send any Echo or Null to a Pipe or not allowed op on remote side
1490  }
1491 
1492  FluxRec* fluxRec = getFreeFluxRec();
1493  fluxRec->xString += makeFluxString( itemNet, handleData, valueData);
1494  itemNet->resetDirtyValue();
1495 
1496  if (handleData.has( ArnLinkHandle::QueueFindRegexp)) {
1497  ARN_RegExp rx( handleData.valueRef( ArnLinkHandle::QueueFindRegexp).ARN_ToRegExp());
1498  // qDebug() << "AddFluxQueue Pipe QOW: rx=" << rx.pattern();
1499  int i;
1500  for (i = 0; i < _fluxPipeQueue.size(); ++i) {
1501  FluxRec*& fluxRecQ = _fluxPipeQueue[i];
1502  _syncMap.fromXString( fluxRecQ->xString);
1503  QString fluxDataStrQ = _syncMap.valueString("data");
1504  if (rx.indexIn( fluxDataStrQ) >= 0) { // Match
1505  // qDebug() << "AddFluxQueue Pipe QOW match: old:"
1506  // << fluxRecQ->xString << " new:" << fluxRec->xString;
1507  _fluxRecPool += fluxRecQ; // Free item to be replaced
1508  fluxRecQ = fluxRec;
1509  i = -1; // Mark match
1510  break;
1511  }
1512  }
1513  if (i >= 0) { // No match
1514  // qDebug() << "AddFluxQueue Pipe QOW nomatch:"
1515  // << fluxRec->xString;
1516  _fluxPipeQueue.enqueue( fluxRec);
1517  }
1518  }
1519  else { // Normal Pipe
1520  // qDebug() << "AddFluxQueue Pipe:"
1521  // << fluxRec->xString;
1522  _fluxPipeQueue.enqueue( fluxRec);
1523  }
1524  }
1525  else { // Normal Item
1526  if (_isClosed) return;
1527 
1528  bool isEchoBidirBlocked = itemNet->isOnlyEcho() && itemNet->isBiDirMode() &&
1529  !itemNet->isSyncFlux();
1530  bool isEchoMasterBlocked = !_isClientSide && itemNet->isMaster() && itemNet->isOnlyEcho() &&
1531  !itemNet->isSyncFlux();
1532  bool isRemAllowBlocked = !_remoteAllow.is( _allow.Write)
1533  && (_isClientSide || !isFreePath( itemNet->path()));
1534  if (isEchoBidirBlocked || isEchoMasterBlocked || isRemAllowBlocked) {
1535  itemNet->resetDirtyValue(); // Arm for more updates
1536  return; // Don't send
1537  }
1538 
1539  itemNet->setQueueNum( ++_queueNumCount);
1540  _fluxItemQueue.enqueue( itemNet);
1541  }
1542 
1543  if (!_isSending) {
1544  sendNext();
1545  }
1546 }
1547 
1548 
1549 void ArnSync::eventToFluxQue( uint netId, int type, const QByteArray& data)
1550 {
1551  if (!netId) return; // Not valid id, item was disabled
1552  if (!_isConnectStarted) return;
1553 
1554  const char* typeStr = ArnMonEventType::txt().getTxt( type, ArnMonEventType::NsCom);
1555  FluxRec* fluxRec = getFreeFluxRec();
1556  _syncMap.clear();
1557  _syncMap.add(ARNRECNAME, "event");
1558  _syncMap.add("id", QByteArray::number( netId));
1559  _syncMap.add("type", typeStr);
1560  _syncMap.add("data", data);
1561  fluxRec->xString += _syncMap.toXString();
1562  _fluxPipeQueue.enqueue( fluxRec);
1563 
1564  if (!_isSending) {
1565  sendNext();
1566  }
1567 }
1568 
1569 
1570 void ArnSync::atomicOpToFluxQue( int op, const QVariant& arg1, const QVariant& arg2, const ArnItemNet* itemNet)
1571 {
1572  if (!itemNet) return;
1573  if (!_isConnectStarted) return;
1574 
1575  const char* opStr = ArnAtomicOp::txt().getTxt( op, ArnAtomicOp::NsCom);
1576  FluxRec* fluxRec = getFreeFluxRec();
1577  _syncMap.clear();
1578  _syncMap.add(ARNRECNAME, "atomop").add("id", QByteArray::number( itemNet->netId()));
1579  _syncMap.add("op", opStr);
1580  if (!arg1.isNull())
1581  _syncMap.add("a1", arg1.toString());
1582  if (!arg2.isNull())
1583  _syncMap.add("a2", arg2.toString());
1584  fluxRec->xString += _syncMap.toXString();
1585  _fluxPipeQueue.enqueue( fluxRec);
1586 
1587  if (!_isSending) {
1588  sendNext();
1589  }
1590 }
1591 
1592 
1593 void ArnSync::destroyToFluxQue( ArnItemNet* itemNet)
1594 {
1595  if (itemNet->isDisable()) return;
1596  if (!_isConnectStarted) return;
1597  if (!_remoteAllow.is( _allow.Delete)) return;
1598 
1599  ArnLink::RetireType rt = ArnLink::RetireType::fromInt( itemNet->retireType());
1600  if ((rt == rt.Tree) || (rt == rt.None)) return; // Not handled here ...
1601 
1602  bool isGlobal = (rt == rt.LeafGlobal) || !_isClientSide; // Server allways Global destroy leaf
1603  FluxRec* fluxRec = getFreeFluxRec();
1604  _syncMap.clear();
1605  const char* delCmd = (_remoteVer[0] >= 2) ? "delete" : "destroy";
1606  _syncMap.add(ARNRECNAME, isGlobal ? delCmd : "nosync")
1607  .add("id", QByteArray::number( itemNet->netId()));
1608  fluxRec->xString += _syncMap.toXString();
1609  _fluxPipeQueue.enqueue( fluxRec);
1610 
1611  if (!_isSending) {
1612  sendNext();
1613  }
1614 }
1615 
1616 
1617 ArnSync::FluxRec* ArnSync::getFreeFluxRec()
1618 {
1619  FluxRec* fluxRec;
1620 
1621  if (_fluxRecPool.empty()) {
1622  fluxRec = new FluxRec;
1623  }
1624  else {
1625  fluxRec = _fluxRecPool.takeLast();
1626  }
1627  fluxRec->xString.resize(0);
1628  fluxRec->queueNum = ++_queueNumCount;
1629 
1630  return fluxRec;
1631 }
1632 
1633 
1634 void ArnSync::addToModeQue( ArnItemNet* itemNet)
1635 {
1636  if (_isClosed) return;
1637  if (!itemNet) return;
1638 
1639  if (!_remoteAllow.is( _allow.ModeChange)
1640  && (_isClientSide || !isFreePath( itemNet->path())))
1641  {
1642  itemNet->resetDirtyMode(); // Arm for new mode update
1643  return;
1644  }
1645 
1646  _modeQueue.enqueue( itemNet);
1647 
1648  if (!_isSending) {
1649  sendNext();
1650  }
1651 }
1652 
1653 
1654 void ArnSync::sendNext()
1655 {
1656  _isSending = false;
1657 
1658  if (!_isConnected || !_socket->isValid()) return;
1659  if (_state != State::Normal) {
1660  if (_isClosed)
1661  closeFinal();
1662  return;
1663  }
1664 
1665  ArnItemNet* itemNet;
1666 
1667  if (!_syncQueue.isEmpty()) {
1668  itemNet = _syncQueue.dequeue();
1669  sendSyncItem( itemNet);
1670  _isSending = true;
1671  }
1672  else if (!_modeQueue.isEmpty()) {
1673  itemNet = _modeQueue.dequeue();
1674  sendModeItem( itemNet);
1675  itemNet->resetDirtyMode();
1676  _isSending = true;
1677  }
1678  else { // Flux queues - send entity with lowest queue number
1679  int itemQueueNum = _fluxItemQueue.isEmpty() ? _queueNumDone + INT_MAX : _fluxItemQueue.head()->queueNum();
1680  int pipeQueueNum = _fluxPipeQueue.isEmpty() ? _queueNumDone + INT_MAX : _fluxPipeQueue.head()->queueNum;
1681  int itemQueueRel = itemQueueNum - _queueNumDone;
1682  int pipeQueueRel = pipeQueueNum - _queueNumDone;
1683 
1684  if ((itemQueueRel < INT_MAX) || (pipeQueueRel < INT_MAX)) { // At least 1 flux queue not empty
1685  if (itemQueueRel < pipeQueueRel) { // Item flux queue
1686  _queueNumDone = itemQueueNum;
1687 
1688  itemNet = _fluxItemQueue.dequeue();
1689  sendFluxItem( itemNet);
1690  itemNet->resetDirtyValue();
1691  }
1692  else { // Pipe flux queue
1693  _queueNumDone = pipeQueueNum;
1694 
1695  FluxRec* fluxRec = _fluxPipeQueue.dequeue();
1696  _fluxRecPool += fluxRec;
1697  send( fluxRec->xString);
1698  }
1699  _isSending = true;
1700  }
1701  else { // Nothing more to send
1702  if (_isClosed)
1703  closeFinal();
1704  }
1705  }
1706 }
1707 
1708 
1709 QByteArray ArnSync::makeFluxString( const ArnItemNet* itemNet, const ArnLinkHandle& handleData,
1710  const QByteArray* valueData)
1711 {
1712  QByteArray type;
1713  if (itemNet->isSyncFlux()) type += "I";
1714  if (itemNet->isOnlyEcho()) type += "E";
1715  if (itemNet->isSaveFlux()) type += "S";
1716  if (itemNet->type() == Arn::DataType::Null) type += "N";
1717 
1718  _syncMap.clear();
1719  _syncMap.add(ARNRECNAME, "flux").add("id", QByteArray::number( itemNet->netId()));
1720 
1721  if (!type.isEmpty())
1722  _syncMap.add("type", type);
1723  qint8 echoSeq = itemNet->echoSeq();
1724  if (echoSeq >= 0)
1725  _syncMap.addNum("es", int(echoSeq));
1726 
1727  if (handleData.has( ArnLinkHandle::QueueFindRegexp))
1728  _syncMap.add("nqrx", handleData.valueRef( ArnLinkHandle::QueueFindRegexp).ARN_ToRegExp().pattern());
1729  else if (handleData.has( ArnLinkHandle::SeqNo))
1730  _syncMap.add("seq", QByteArray::number( handleData.valueRef( ArnLinkHandle::SeqNo).toInt()));
1731 
1732  _syncMap.add("data", valueData ? *valueData : itemNet->arnExport());
1733 
1734  return _syncMap.toXString();
1735 }
1736 
1737 
1738 void ArnSync::sendFluxItem( const ArnItemNet* itemNet)
1739 {
1740  if (!itemNet || !itemNet->isOpen()) {
1741  sendNext(); // Warning: this is recursion while not existing items
1742  return;
1743  }
1744 
1745  send( makeFluxString( itemNet, ArnLinkHandle::null(), arnNullptr));
1746 }
1747 
1748 
1749 void ArnSync::sendSyncItem( ArnItemNet* itemNet)
1750 {
1751  if (!itemNet || !itemNet->isOpen()) {
1752  sendNext(); // Warning: this is recursion while not existing items
1753  return;
1754  }
1755 
1756  _syncMap.clear();
1757  _syncMap.add(ARNRECNAME, "sync");
1758  _syncMap.add("path", (*_toRemotePathCB)( _sessionHandler, itemNet->path()));
1759  _syncMap.add("id", QByteArray::number( itemNet->netId()));
1760  QByteArray smode = itemNet->getSyncModeString();
1761  if (!smode.isEmpty()) {
1762  _syncMap.add("smode", smode);
1763  }
1764 
1765  if (Arn::debugShareObj) qDebug() << "Send sync: localPath=" << itemNet->path()
1766  << ", " << _syncMap.toXString();
1767  sendXSMap( _syncMap);
1768 }
1769 
1770 
1771 void ArnSync::sendModeItem( ArnItemNet* itemNet)
1772 {
1773  if (!itemNet || !itemNet->isOpen()) {
1774  sendNext(); // Warning: this is recursion while not existing items
1775  return;
1776  }
1777 
1778  _syncMap.clear();
1779  _syncMap.add(ARNRECNAME, "mode");
1780  _syncMap.add("id", QByteArray::number( itemNet->netId()));
1781  _syncMap.add("data", itemNet->getModeString());
1782  sendXSMap( _syncMap);
1783 }
1784 
1785 
1786 void ArnSync::customEvent( QEvent* ev)
1787 {
1788  // Is setup as ArnEvent handler for ArnItemNet
1789  // Selected handler must finish with ArnBasicItemEventHandler::defaultEvent( ev).
1790 
1791  int evIdx = ev->type() - ArnEvent::baseType();
1792  switch (evIdx) {
1794  {
1795  ArnEvValueChange* e = static_cast<ArnEvValueChange*>( ev);
1796  ArnItemNet* itemNet = static_cast<ArnItemNet*>( static_cast<ArnBasicItem*>( e->target()));
1797  if (!itemNet) break; // No target, deleted/closed ...
1798 
1799  quint32 sendId = e->sendId();
1800  bool isBlocked = itemNet->isBlock( sendId);
1801  // qDebug() << "ArnSync ArnEvValueChange: inItemPath=" << itemNet->path()
1802  // << " blockedUpdate=" << isBlocked;
1803  if (isBlocked) // Update was initiated from this Item, it can be blocked (e.g. client)
1804  break;
1805 
1806  itemNet->addIsOnlyEcho( sendId);
1807  if (_isClientSide) { // Client non echo
1808  itemNet->nextEchoSeq();
1809  itemNet->setSyncFlux( false);
1810  }
1811  else if (!itemNet->isOnlyEcho()) { // Server non echo
1812  itemNet->resetEchoSeq();
1813  itemNet->setSyncFlux( false);
1814  itemNet->setSaveFlux( e->handleData().flags().is( ArnLinkHandle::Flags::FromPersist));
1815  }
1816  itemValueUpdater( e->handleData(), e->valueData(), itemNet);
1817  break;
1818  }
1820  {
1821  ArnEvAtomicOp* e = static_cast<ArnEvAtomicOp*>( ev);
1822  ArnItemNet* itemNet = static_cast<ArnItemNet*>( static_cast<ArnBasicItem*>( e->target()));
1823  if (!itemNet) break; // No target, deleted/closed ...
1824 
1825  // qDebug() << "ArnSync ArnEvAtomicOp: inItemPath=" << itemNet->path()
1826  // << " op=" << e->op().toString();
1827  atomicOpToFluxQue( e->op(), e->arg1(), e->arg2(), itemNet);
1828  break;
1829  }
1831  {
1832  ArnEvModeChange* e = static_cast<ArnEvModeChange*>( ev);
1833  ArnItemNet* itemNet = static_cast<ArnItemNet*>( static_cast<ArnBasicItem*>( e->target()));
1834  if (!itemNet) break; // No target, deleted/closed ...
1835 
1836  // qDebug() << "ArnSync ArnEvModeChange: path=" << e->path() << " mode=" << e->mode()
1837  // << " inItemPath=" << itemNet->path();
1838  if (!itemNet->isFolder())
1839  itemModeUpdater( itemNet);
1840  break;
1841  }
1843  {
1844  ArnEvMonitor* e = static_cast<ArnEvMonitor*>( ev);
1845  ArnItemNet* itemNet = static_cast<ArnItemNet*>( static_cast<ArnBasicItem*>( e->target()));
1846  if (!itemNet) break; // No target, deleted/closed ...
1847 
1848  // qDebug() << "ArnSync Ev Monitor: type=" << ArnMonEventType::txt().getTxt( e->monEvType())
1849  // << " data=" << e->data() << " isLocal=" << e->isLocal()
1850  // << " isMon=" << itemNet->isMonitor() << " target=" << itemNet->path();
1851  if (e->sessionHandler() == _sessionHandler) // Event is for this session
1852  doArnMonEvent( e->monEvType(), e->data(), e->isLocal(), itemNet);
1853  break;
1854  }
1856  {
1857  ArnEvRetired* e = static_cast<ArnEvRetired*>( ev);
1858  ArnItemNet* itemNet = static_cast<ArnItemNet*>( static_cast<ArnBasicItem*>( e->target()));
1859  if (!itemNet) break; // No target, deleted/closed ...
1860 
1861  if (itemNet->isMonitor()) {
1862  QString destroyPath = e->isBelow() ? e->startLink()->linkPath() : itemNet->path();
1863  // qDebug() << "ArnSync Ev Retired: path=" << destroyPath << " inPath=" << itemNet->path();
1864  doArnMonEvent( ArnMonEventType::ItemDeleted, destroyPath.toUtf8(), true, itemNet);
1865  }
1866 
1867  if (!e->isBelow()) { // Retire is to this item
1868  if (Arn::debugLinkDestroy) qDebug() << "itemRemove: netId=" << itemNet->netId() << " path=" << itemNet->path();
1869  removeItemNetRefs( itemNet);
1870  destroyToFluxQue( itemNet); // This queue contains text not the itemNet
1871  delete itemNet;
1872  e->setTarget( arnNullptr); // Target is now deleted
1873  }
1874  break;
1875  }
1876  default:
1877  break;
1878  }
1879 
1880  ArnBasicItemEventHandler::defaultEvent( ev);
1881 }
static int baseType(int setVal=-1)
Definition: ArnEvent.cpp:62
uint rand()
Definition: Arn.cpp:259
#define ARNSYNCVER
Definition: ArnSync.cpp:48
bool fromXString(const QByteArray &inXString, int size=-1)
static bool isFolder(const QString &path)
Definition: ArnM.cpp:413
bool debugRecInOut
Definition: ArnLib.cpp:41
Monitor of server object for client.
Definition: Arn.hpp:145
const QByteArray * valueData() const
Definition: ArnEvent.hpp:250
const QByteArray & data() const
Definition: ArnEvent.hpp:188
bool debugLinkDestroy
Definition: ArnLib.cpp:40
#define ARNRECNAME
Definition: ArnSync.hpp:45
Container class with string representation for serialized data.
Definition: XStringMap.hpp:107
const QVariant & arg1() const
Definition: ArnEvent.hpp:277
Arn::DataType type() const
The type stored in the Arn Data Object
Definition: ArnItem.hpp:135
QString fullPath(const QString &path)
Convert a path to a full absolute path.
Definition: Arn.cpp:82
Value for Server, can not be set in Client.
Definition: Arn.hpp:158
Assigning same value generates an update of the Arn Data Object
Definition: Arn.hpp:64
The Client session Sync mode at connect & reconnect.
Definition: Arn.hpp:155
const QString pathLocalSys
Definition: Arn.cpp:46
XStringMap & addValues(const QStringList &stringList)
#define ARN_RegExp
Definition: ArnCompat.hpp:70
Nothing allowed.
Definition: Arn.hpp:212
Explicit permanent Master mode, typically an observer or manually setup Master mode.
Definition: Arn.hpp:164
Convenience, allow all.
Definition: Arn.hpp:226
QByteArray arnExport() const
Definition: ArnItem.hpp:345
static QStringList items(const QString &path)
Get the childrens of the folder at path
Definition: ArnM.cpp:315
const ArnLinkHandle & handleData() const
Definition: ArnEvent.hpp:253
int sendId() const
Definition: ArnEvent.hpp:247
QString makePath(const QString &parentPath, const QString &itemName)
Make a path from a parent and an item name.
Definition: Arn.cpp:128
XStringMap & addNum(const char *key, int val)
An Arn object was deleted.
Definition: ArnMonEvent.hpp:51
bool open(const QString &path)
Open a handle to an Arn Data Object
Definition: ArnItemB.cpp:91
ArnLink * startLink() const
Definition: ArnEvent.hpp:210
QByteArray toXString() const
void * sessionHandler() const
Definition: ArnEvent.hpp:194
const QVariant & arg2() const
Definition: ArnEvent.hpp:280
int monEvType() const
Definition: ArnEvent.hpp:185
First local write gives permanent Master mode, typically a client value reporter. ...
Definition: Arn.hpp:162
bool isBelow() const
Definition: ArnEvent.hpp:213
bool debugShareObj
Definition: ArnLib.cpp:42
Internal: start the Monitor.
Definition: ArnMonEvent.hpp:56
static void errorLog(QString errText, ArnError err=ArnError::Undef, void *reference=arnNullptr)
Definition: ArnM.cpp:1025
Internal: restart the Monitor.
Definition: ArnMonEvent.hpp:58
XStringMap & add(const char *key, const QByteArray &val)
bool debugMonitorTest
Definition: ArnLib.cpp:44
Default dynamic auto master mode, general purpose, prohibit Null value sync.
Definition: Arn.hpp:160
void setTarget(void *target)
Definition: ArnEvent.cpp:138
const Op & op() const
Definition: ArnEvent.hpp:274
void * target() const
Definition: ArnEvent.hpp:116
bool isLocal() const
Definition: ArnEvent.hpp:191
Handle for an Arn Data Object.
Definition: ArnItem.hpp:72
Base class handle for an Arn Data Object.