44 #include <QStringList> 48 #define ARNSYNCVER "4.0" 53 ArnSync::ArnSync( QTcpSocket *socket,
bool isClientSide, QObject *parent)
57 _sessionHandler = arnNullptr;
58 _toRemotePathCB = &nullConvertPath;
59 _arnLogin = arnNullptr;
60 _isClientSide = isClientSide;
63 _isClosed = isClientSide;
66 _isConnectStarted = !isClientSide;
68 _isDemandLogin =
false;
87 qDeleteAll( _itemNetMap);
88 qDeleteAll( _fluxRecPool);
89 qDeleteAll( _fluxPipeQueue);
93 void ArnSync::setArnLogin( ArnSyncLogin* arnLogin)
101 connect( _socket, SIGNAL(disconnected()),
this, SLOT(disConnected()));
102 connect( _socket, SIGNAL(readyRead()),
this, SLOT(socketInput()));
103 connect( _socket, SIGNAL(bytesWritten(qint64)),
this, SLOT(sendNext()));
104 connect( &_loginDelayTimer, SIGNAL(timeout()),
this, SLOT(doLoginSeq0End()));
107 _isConnected =
false;
112 if (_isDemandLogin) {
119 setState( State::Normal);
125 void ArnSync::startNormalSync()
127 if (_state == State::Normal)
return;
130 clearNonPipeQueues();
135 QMapIterator<uint,ArnItemNet*> i( _itemNetMap);
136 while (i.hasNext()) {
140 itemNet->resetDirtyValue();
141 itemNet->resetDirtyMode();
143 _syncQueue.enqueue( itemNet);
144 mode = itemNet->getModeString();
146 if (!mode.isEmpty() && !itemNet->isDirtyMode()) {
147 _modeQueue.enqueue( itemNet);
150 bool isMaster = itemNet->isMaster();
152 bool isIniMaster =
false;
153 bool isIniSlave =
false;
155 switch (_clientSyncMode) {
158 if (_remoteVer[0] < 3)
160 isIniMaster = itemNet->localUpdateSinceStop() > 0;
161 isIniSlave = isMaster && isNull;
166 if (itemNet->localUpdateCount() > 0) {
167 itemNet->setMaster();
170 if (_remoteVer[0] < 3)
172 isIniSlave = isMaster && isNull && itemNet->isSaveMode();
175 if (_remoteVer[0] < 3)
177 isIniSlave = isMaster && isNull && itemNet->isSaveMode();
183 itemNet->setIniMaster( isIniMaster);
184 itemNet->setIniSlave( isIniSlave);
185 bool isMasterStart = itemNet->isMasterAtStart();
186 bool isValueBlocked = itemNet->isPipeMode() ||
188 if (isMasterStart && !isValueBlocked) {
189 itemNet->resetEchoSeq();
190 itemNet->setSyncFlux(
true);
191 itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet);
194 setState( State::Normal);
200 void ArnSync::sendXSMap(
const XStringMap& xsMap)
206 void ArnSync::send(
const QByteArray& xString)
212 QByteArray sendString;
213 sendString += xString;
215 sendString +=
"\r\n";
216 _socket->write( sendString);
217 _trafficOut += quint32( sendString.size());
221 void ArnSync::sendNoSync(
const QString& path)
231 void ArnSync::sendSetTree(
const QString& path)
233 if (!_remoteAllow.is( _allow.Write))
return;
243 void ArnSync::sendDelete(
const QString& path)
245 if (!_remoteAllow.is( _allow.Delete))
return;
255 void ArnSync::sendInfo(
int type,
const QByteArray& data)
258 _commandMap.add(
ARNRECNAME,
"info").add(
"type", QByteArray::number( type));
259 _commandMap.add(
"data", data);
261 sendXSMap( _commandMap);
265 void ArnSync::sendMessage(
int type,
const QByteArray& data)
268 _commandMap.add(
ARNRECNAME,
"message").add(
"type", QByteArray::number( type));
269 _commandMap.add(
"data", data);
271 sendXSMap( _commandMap);
286 void ArnSync::sendExit()
295 uint ArnSync::remoteVer(uint index)
297 if (index >= 2)
return 0;
299 return _remoteVer[ index];
304 void ArnSync::setupItemNet( ArnItemNet* itemNet, uint netId)
306 itemNet->setNetId( netId);
307 _itemNetMap.insert( netId, itemNet);
309 itemNet->setEventHandler(
this);
312 if (itemNet->path() ==
"/") {
313 qDebug() <<
"ArnSync setupItemNet root: eventH=" << itemNet->eventHandler()
314 <<
" sessionH=" << itemNet->sessionHandler()
315 <<
" isMon=" << itemNet->isMonitor() <<
" netId=" << itemNet->netId()
316 <<
" itemNet=" << itemNet;
322 void ArnSync::itemValueUpdater(
const ArnLinkHandle& handleData,
const QByteArray* valueData, ArnItemNet* itemNet)
324 if (!itemNet)
return;
326 if (itemNet->isLeadValueUpdate())
327 addToFluxQue( handleData, valueData, itemNet);
331 void ArnSync::itemModeUpdater( ArnItemNet* itemNet)
333 if (!itemNet)
return;
335 if (itemNet->isLeadModeUpdate())
336 addToModeQue( itemNet);
341 ArnItemNet* ArnSync::newNetItem(
const QString& path,
344 if (!_remoteAllow.isAny( _allow.ReadWrite)) {
345 QString remotePath = (*_toRemotePathCB)( _sessionHandler, path);
346 if (!isFreePath( remotePath)) {
348 path +
" remoteAllow=" + _remoteAllow.toString() +
349 " (" + QString::number(_remoteAllow.toInt()) +
")",
355 ArnItemNet* itemNet =
new ArnItemNet( _sessionHandler);
356 if (!itemNet->open( path)) {
361 uint netId = itemNet->linkId();
362 if (_itemNetMap.contains( netId)) {
365 itemNet = _itemNetMap.value( netId, arnNullptr);
366 itemNet->addSyncMode( syncMode,
true);
371 qDebug() <<
"Arn netSync Item already synced: path=" << itemNet->path();
379 itemNet->addSyncMode( syncMode,
true);
380 setupItemNet( itemNet, netId);
381 itemNet->setBlockEcho(
true);
383 if (_isClosed)
return itemNet;
385 _syncQueue.enqueue( itemNet);
397 _clientSyncMode = clientSyncMode;
401 void ArnSync::setSessionHandler(
void* sessionHandler)
403 _sessionHandler = sessionHandler;
407 void ArnSync::setToRemotePathCB( ArnSync::ConVertPathCB toRemotePathCB)
409 _toRemotePathCB = toRemotePathCB;
413 QString ArnSync::nullConvertPath(
void* context,
const QString& path)
421 void ArnSync::setWhoIAm(
const QByteArray& whoIAm)
427 QByteArray ArnSync::remoteWhoIAm()
const 429 return _remoteWhoIAm;
433 QString ArnSync::loginUserName()
const 435 return _loginUserName;
445 void ArnSync::close()
447 _isConnectStarted =
false;
449 if (_isClosed)
return;
459 void ArnSync::closeFinal()
462 _socket->disconnectFromHost();
463 _isConnected =
false;
467 void ArnSync::clearNonPipeQueues()
471 _fluxItemQueue.clear();
475 void ArnSync::clearAllQueues()
477 clearNonPipeQueues();
479 _fluxRecPool += _fluxPipeQueue;
480 _fluxPipeQueue.clear();
484 void ArnSync::setRemoteVer(
const QByteArray& remVer)
486 if (remVer.isEmpty())
return;
489 QList<QByteArray> remVerParts = remVer.split(
'.');
490 int partsNum = qMin( remVerParts.size(), 2);
491 for (
int i = 0; i < partsNum; ++i) {
492 _remoteVer[i] = remVerParts.at(i).toUInt();
495 XStringMap::Options xop;
496 if (_remoteVer[0] >= 4) {
497 xop = xop.NullTilde | xop.RepeatLen | xop.Frame;
499 _commandMap.setOptions( xop);
500 _replyMap.setOptions( xop);
501 _syncMap.setOptions( xop);
505 void ArnSync::setState( ArnSync::State state)
507 if (state == _state)
return;
510 emit stateChanged( _state);
514 bool ArnSync::isFreePath(
const QString& path)
const 516 foreach (
const QString& freePath, _freePathTab) {
517 if (path.startsWith( freePath))
return true;
523 void ArnSync::socketInput()
525 _dataReadBuf.resize(
int(_socket->bytesAvailable()));
526 int nbytes = int(_socket->read( _dataReadBuf.data(), qint64(_dataReadBuf.size())));
527 if (nbytes <= 0)
return;
528 if (_isClosed)
return;
530 _dataReadBuf.resize( nbytes);
531 _dataRemain += _dataReadBuf;
532 _trafficIn += uint( nbytes);
536 while ((pos = _dataRemain.indexOf(
"\n")) >= 0) {
539 xString.append( _dataRemain.constData(), pos);
540 _dataRemain.remove(0, pos + 1);
542 xString.replace(
'\r',
"");
543 _commandMap.fromXString( xString);
549 if (_replyMap.size()) {
550 sendXSMap( _replyMap);
558 void ArnSync::doCommands()
561 QByteArray command = _commandMap.value(0);
564 if (command ==
"flux") {
565 stat = doCommandFlux();
567 else if (command ==
"atomop") {
568 stat = doCommandAtomOp();
570 else if (command ==
"event") {
571 stat = doCommandEvent();
573 else if (command ==
"get") {
574 stat = doCommandGet();
576 else if (command ==
"set") {
577 stat = doCommandSet();
579 else if (command ==
"sync") {
580 stat = doCommandSync();
582 else if (command ==
"mode") {
583 stat = doCommandMode();
585 else if (command ==
"nosync") {
586 stat = doCommandNoSync();
588 else if (command ==
"destroy") {
589 stat = doCommandDelete();
591 else if (command ==
"delete") {
592 stat = doCommandDelete();
594 else if (command ==
"message") {
595 stat = doCommandMessage();
597 else if (command ==
"ls") {
598 stat = doCommandLs();
600 else if (command ==
"info") {
601 stat = doCommandInfo();
603 else if (command ==
"Rinfo") {
604 stat = doCommandRInfo();
606 else if (command ==
"ver") {
607 stat = doCommandVer();
609 else if (command ==
"Rver") {
610 stat = doCommandRVer();
612 else if (command ==
"login") {
613 stat = doCommandLogin();
615 else if (command ==
"exit") {
616 _socket->disconnectFromHost();
619 else if (command ==
"err") {
620 qDebug() <<
"REC-ERR: |" << _commandMap.toXString() <<
"|";
622 else if (command.startsWith(
'R'));
626 _replyMap.add(
"data", QByteArray(
"Unknown record:") + command);
629 if ((_replyMap.size() == 0) && (stat !=
ArnError::Ok)) {
631 _replyMap.add(
"data", QByteArray(
"record:") + command +
632 " errTxt:" + ArnError::txt().getTxt(
int( stat)));
635 if (_replyMap.size()) {
636 _replyMap.add(
"stat", QByteArray::number( stat));
639 if (command.startsWith(
'R')) {
640 emit replyRecord( _commandMap);
645 void ArnSync::startLogin()
650 xsm.
addNum(
"demand", _isDemandLogin).
addNum(
"salt1", _loginSalt1);
656 void ArnSync::loginToArn(
const QString& userName,
const QString& passwordHash,
Arn::Allow allow)
659 _loginUserName = userName;
660 _loginPwHash = passwordHash;
667 void ArnSync::loginToArn()
670 if (_loginUserName.isEmpty())
return;
671 if (_loginNextSeq != -2)
return;
673 QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt1, _loginSalt2, _loginPwHash);
675 xsm.
add(
"user", _loginUserName).
add(
"pass", pwHashX);
681 void ArnSync::doLoginSeq0End()
684 _loginDelayTimer.stop();
688 xsm.
add(
"demand", QByteArray::number( _isDemandLogin));
689 xsm.
add(
"salt2", QByteArray::number( _loginSalt2));
695 uint ArnSync::doCommandLogin()
697 if (_isClientSide && (_state != State::Login)) {
701 int seq =_commandMap.value(
"seq").toInt();
710 _loginSalt1 = _commandMap.value(
"salt1").toUInt();
711 if (_loginNextSeq == 0) {
716 _loginDelayTimer.start(2000);
725 _loginSalt2 = _commandMap.value(
"salt2").toUInt();
726 bool isRemoteDemandLogin = _commandMap.value(
"demand").toUInt() != 0;
727 if (_isDemandLogin || isRemoteDemandLogin) {
730 emit loginRequired( _loginReqCode);
744 QByteArray userClient = _commandMap.value(
"user");
745 QByteArray pwHashXClient = _commandMap.value(
"pass");
746 QByteArray pwHashXServer;
747 _loginUserName = QString::fromUtf8( userClient.constData(), userClient.size());
751 const ArnSyncLogin::AccessSlot* accSlot = _arnLogin->findAccess( userClient);
753 QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt1, _loginSalt2, accSlot->pwHash);
754 if (pwHashXClient == pwHashX) {
755 _allow = accSlot->allow;
756 pwHashXServer = ArnSyncLogin::pwHashXchg( _loginSalt2, _loginSalt1, accSlot->pwHash);
762 xsm.
add(
"stat", QByteArray::number( stat));
763 xsm.
add(
"allow", QByteArray::number( _allow.toInt()));
764 xsm.
add(
"pass", pwHashXServer);
774 int statServer = _commandMap.value(
"stat").toInt();
775 _remoteAllow = Arn::Allow::fromInt( _commandMap.value(
"allow").toInt());
776 QByteArray pwHashXServer = _commandMap.value(
"pass");
778 QByteArray pwHashX = ArnSyncLogin::pwHashXchg( _loginSalt2, _loginSalt1, _loginPwHash);
782 else if (pwHashXServer != pwHashX)
788 xsm.
add(
"stat", QByteArray::number( stat));
789 xsm.
add(
"allow", QByteArray::number( stat ? _allow.toInt() : 0));
794 emit loginCompleted();
807 int stat = _commandMap.value(
"stat").toInt();
808 _remoteAllow = Arn::Allow::fromInt( _commandMap.value(
"allow").toInt());
812 emit loginCompleted();
813 setState( State::Normal);
825 void ArnSync::getTraffic( quint64& in, quint64& out)
const 832 uint ArnSync::doCommandSync()
836 QByteArray path = _commandMap.value(
"path");
837 QByteArray smode = _commandMap.value(
"smode");
838 uint netId = _commandMap.value(
"id").toUInt();
841 if (_itemNetMap.contains( netId)) {
843 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
844 qDebug() <<
"ArnSync CommandSync Item already synced: path=" << itemNet->path();
845 removeItemNetRefs( itemNet);
849 bool isCreateAllow = _allow.is( _allow.Create);
851 ArnItemNet* itemNet =
new ArnItemNet( _sessionHandler);
852 if (!itemNet->openWithFlags( path, createFlag)) {
857 setupItemNet( itemNet, netId);
858 itemNet->addSyncModeString( smode,
false);
861 if (syncMode.is( syncMode.
Monitor)) {
862 setupMonitorItem( itemNet);
864 if (!itemNet->getModeString().isEmpty()) {
865 itemModeUpdater( itemNet);
869 itemNet->isPipeMode() ||
871 if (!isBlockedValue && !(itemNet->isMasterAtStart())) {
873 itemNet->setSyncFlux(
true);
874 itemNet->setSaveFlux( itemNet->isSaveMode());
875 itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet);
883 void ArnSync::setupMonitorItem(ArnItemNet *itemNet)
886 itemNet->setMonitor(
true);
889 doChildsToEvent( itemNet);
893 void ArnSync::doChildsToEvent( ArnItemNet *itemNet)
896 QString path = itemNet->path();
897 QStringList childList = itemNet->childItemsMain();
898 foreach (QString childName, childList) {
899 itemNet->sendNewItemMonEvent(
Arn::makePath( path, childName),
true);
904 uint ArnSync::doCommandMode()
908 uint netId = _commandMap.value(
"id").toUInt();
909 QByteArray data = _commandMap.value(
"data");
911 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
916 itemNet->setModeString( data);
921 uint ArnSync::doCommandNoSync()
924 uint netId = _commandMap.value(
"id").toUInt();
926 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
931 removeItemNetRefs( itemNet);
937 QString path = _commandMap.valueString(
"path");
938 QList<ArnItemNet*> noSyncList;
940 foreach (ArnItemNet* itemNet, _itemNetMap) {
941 if (itemNet->path().startsWith( path)) {
942 noSyncList += itemNet;
947 foreach (ArnItemNet* itemNet, noSyncList) {
948 removeItemNetRefs( itemNet);
955 uint ArnSync::doCommandFlux()
959 uint netId = _commandMap.value(
"id").toUInt();
960 QByteArray type = _commandMap.value(
"type");
961 QByteArray nqrx = _commandMap.value(
"nqrx");
962 QByteArray seq = _commandMap.value(
"seq");
963 QByteArray data = _commandMap.value(
"data");
964 qint8 echoSeq = qint8(_commandMap.value(
"es",
"-1").toInt());
966 bool isSyncFlux = type.contains(
"I");
967 bool isSaveFlux = type.contains(
"S");
968 bool isOnlyEcho = type.contains(
"E");
969 bool isNull = type.contains(
"N");
971 ArnLinkHandle handleData;
972 handleData.flags().set( ArnLinkHandle::Flags::FromRemote);
974 handleData.add( ArnLinkHandle::QueueFindRegexp,
975 QVariant(
ARN_RegExp( QString::fromUtf8( nqrx.constData(), nqrx.size()))));
977 handleData.add( ArnLinkHandle::SeqNo,
978 QVariant( seq.toInt()));
980 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
986 bool isEchoPipeBlocked = isOnlyEcho && itemNet->isPipeMode();
987 bool isEchoBidirBlocked = isOnlyEcho && !isSyncFlux && itemNet->isBiDirMode() && (_remoteVer[0] >= 3);
988 bool isEchoMasterBlocked = isOnlyEcho && _isClientSide && itemNet->isMaster() &&
990 bool isEchoSeqBlocked = isOnlyEcho && _isClientSide && itemNet->isEchoSeqOld( echoSeq);
991 bool isValueBlocked = isNullBlocked || isEchoPipeBlocked || isEchoBidirBlocked ||
992 isEchoMasterBlocked || isEchoSeqBlocked;
993 if (!isValueBlocked) {
995 itemNet->setEchoSeq( echoSeq);
996 bool isIgnoreSame = isOnlyEcho;
997 itemNet->arnImport( data, isIgnoreSame, handleData);
999 else if (_isClientSide && isNullBlocked && isSyncFlux
1002 itemNet->setSyncFlux(
true);
1003 itemValueUpdater( ArnLinkHandle::null(), arnNullptr, itemNet);
1009 uint ArnSync::doCommandAtomOp()
1013 uint netId = _commandMap.value(
"id").toUInt();
1014 QByteArray opStr = _commandMap.value(
"op");
1015 QByteArray a1Str = _commandMap.value(
"a1");
1016 QByteArray a2Str = _commandMap.value(
"a2");
1020 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1025 if (!itemNet->isAtomicOpProvider())
1030 itemNet->setBits( a1Str.toInt(), a2Str.toInt());
1033 itemNet->addValue( a1Str.toInt());
1036 #ifdef ARNREAL_FLOAT 1037 itemNet->addValue( a1Str.toFloat());
1039 itemNet->addValue( a1Str.toDouble());
1050 uint ArnSync::doCommandEvent()
1055 uint netId = _commandMap.value(
"id").toUInt();
1056 QByteArray typeStr = _commandMap.value(
"type");
1057 QByteArray data = _commandMap.value(
"data");
1059 int type = ArnMonEventType::txt().getEnumVal( typeStr.constData(),
1061 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1069 itemNet->sendMonEvent( type, data,
false);
1074 uint ArnSync::doCommandSet()
1079 QByteArray path = _commandMap.value(
"path");
1080 QByteArray data = _commandMap.value(
"data");
1082 _replyMap.add(
ARNRECNAME,
"Rset").add(
"path", path);
1084 bool isCreateAllow = _allow.is( _allow.Create);
1086 ArnItemNet item( _sessionHandler);
1087 if (!item.openWithFlags( path, createFlag)) {
1091 if (!item.isFolder()) {
1092 ArnLinkHandle handleData;
1093 handleData.flags().set( ArnLinkHandle::Flags::FromRemote);
1101 uint ArnSync::doCommandGet()
1105 QByteArray path = _commandMap.value(
"path");
1108 _replyMap.add(
ARNRECNAME,
"Rget").add(
"path", path);
1110 bool isCreateAllow = _allow.is( _allow.Create);
1112 if (!item.
open( path)) {
1118 if (!type.isEmpty())
1119 _replyMap.add(
"type", type);
1121 _replyMap.add(
"data", item.
arnExport());
1126 uint ArnSync::doCommandLs()
1130 QByteArray path = _commandMap.value(
"path");
1133 _replyMap.add(
ARNRECNAME,
"Rls").add(
"path", path);
1137 int nItems = subitems.size();
1139 for (
int i = 0; i < nItems; ++i) {
1140 _replyMap.add(
"item", uint(i + 1), subitems.at(i));
1151 uint ArnSync::doCommandDelete()
1155 uint netId = _commandMap.value(
"id",
"0").toUInt();
1158 ArnItemNet* itemNet = _itemNetMap.value( netId, arnNullptr);
1163 itemNet->setDisable();
1164 itemNet->destroyLink();
1167 QByteArray path = _commandMap.value(
"path");
1170 emit xcomDelete( path);
1177 uint ArnSync::doCommandMessage()
1179 int type = _commandMap.value(
"type").toInt();
1180 QByteArray data = _commandMap.value(
"data");
1182 emit messageReceived( type, data);
1188 uint ArnSync::doCommandInfo()
1194 int type = _commandMap.value(
"type").toInt();
1195 QByteArray data = _commandMap.value(
"data");
1203 xmOut.
add( _customMap);
1206 case InfoType::FreePaths:
1209 case InfoType::WhoIAm:
1210 _remoteWhoIAm = data;
1218 emit infoReceived( type);
1220 _replyMap.add(
ARNRECNAME,
"Rinfo").add(
"type", QByteArray::number( type));
1221 _replyMap.add(
"data", xmOut.
toXString());
1226 uint ArnSync::doCommandRInfo()
1231 if (_state == State::Info) {
1232 int type = _commandMap.value(
"type").toInt();
1233 QByteArray data = _commandMap.value(
"data");
1235 doInfoInternal( type, data);
1237 emit infoReceived( type);
1244 uint ArnSync::doCommandVer()
1249 if (_state == State::Init) {
1250 setRemoteVer( _commandMap.value(
"ver",
"1.0"));
1251 if (_remoteVer[0] >= 2)
1252 setState( State::Login);
1254 setState( State::Normal);
1257 setRemoteVer( _commandMap.value(
"ver",
""));
1265 uint ArnSync::doCommandRVer()
1270 if (_state == State::Version) {
1271 setRemoteVer( _commandMap.value(
"ver",
"1.0"));
1272 if (_remoteVer[0] >= 2) {
1273 setState( State::Info);
1274 _curInfoType = InfoType::Start;
1275 doInfoInternal( InfoType::Start);
1277 else if (!_isDemandLogin) {
1282 setState( State::Login);
1283 emit loginRequired(3);
1291 bool ArnSync::isDemandLogin()
const 1293 return _isDemandLogin;
1297 void ArnSync::setDemandLogin(
bool isDemandLogin)
1299 _isDemandLogin = isDemandLogin;
1303 void ArnSync::addFreePath(
const QString& path)
1305 if (!_freePathTab.contains( path))
1306 _freePathTab += path;
1310 QStringList ArnSync::freePaths()
const 1312 return _freePathTab;
1316 void ArnSync::connectStarted()
1318 if (!_isConnectStarted) {
1319 _isConnectStarted =
true;
1325 void ArnSync::connected()
1327 if (!_isClientSide)
return;
1330 _isConnected =
true;
1333 setState( State::Version);
1340 void ArnSync::disConnected()
1342 _isConnected =
false;
1345 if (_isClientSide) {
1349 foreach (ArnItemNet* itemNet, _itemNetMap) {
1350 itemNet->onConnectStop();
1355 QList<uint> destroyList;
1356 foreach (ArnItemNet* itemNet, _itemNetMap) {
1357 if (itemNet->isAutoDestroy()) {
1358 destroyList += itemNet->netId();
1363 foreach (uint netId, destroyList) {
1364 ArnItemNet* itemNet = _itemNetMap.value( netId, 0);
1367 itemNet->destroyLink(
true);
1376 void ArnSync::removeItemNetRefs( ArnItemNet* itemNet)
1378 if (!itemNet)
return;
1381 s = _itemNetMap.remove( itemNet->netId());
1383 s = _syncQueue.removeAll( itemNet);
1385 s = _modeQueue.removeAll( itemNet);
1387 s = _fluxItemQueue.removeAll( itemNet);
1393 void ArnSync::doArnMonEvent(
int type,
const QByteArray& data,
bool isLocal, ArnItemNet* itemNet)
1396 ArnM::errorLog( QString(tr(
"Can't get ArnItemNet sender for doArnEvent")),
1406 if (_remoteAllow.is( _allow.Read)
1407 || (isFreePath( (*_toRemotePathCB)( _sessionHandler, itemNet->path()))))
1409 eventToFluxQue( itemNet->netId(), type, data);
1420 if (isLocal && _isClientSide) {
1421 itemNet->addSyncMode( syncMode.
Monitor,
true);
1423 else if (!isLocal && !_isClientSide && !syncMode.is( syncMode.
Monitor)) {
1424 setupMonitorItem( itemNet);
1425 itemNet->addSyncMode( syncMode.
Monitor,
false);
1432 if (!isLocal && !_isClientSide) {
1434 doChildsToEvent( itemNet);
1440 void ArnSync::doInfoInternal(
int infoType,
const QByteArray& data)
1445 if (infoType != _curInfoType) {
1446 emit loginRequired(4);
1451 case InfoType::Start:
1452 _curInfoType = InfoType::FreePaths;
1453 sendInfo( InfoType::FreePaths);
1455 case InfoType::FreePaths:
1456 _freePathTab = xmIn.values();
1457 _curInfoType = InfoType::WhoIAm;
1458 sendInfo( InfoType::WhoIAm, _whoIAm);
1460 case InfoType::WhoIAm:
1461 _remoteWhoIAm = data;
1463 setState( State::Login);
1473 void ArnSync::addToFluxQue(
const ArnLinkHandle& handleData,
const QByteArray* valueData,
1474 ArnItemNet* itemNet)
1476 if (!itemNet)
return;
1478 if (itemNet->isPipeMode()) {
1479 if (!_isConnectStarted)
return;
1481 if (itemNet->isOnlyEcho()
1483 || (!_remoteAllow.is( _allow.Write)
1484 && (_isClientSide || !isFreePath( itemNet->path()))))
1488 itemNet->resetDirtyValue();
1492 FluxRec* fluxRec = getFreeFluxRec();
1493 fluxRec->xString += makeFluxString( itemNet, handleData, valueData);
1494 itemNet->resetDirtyValue();
1496 if (handleData.has( ArnLinkHandle::QueueFindRegexp)) {
1497 ARN_RegExp rx( handleData.valueRef( ArnLinkHandle::QueueFindRegexp).ARN_ToRegExp());
1500 for (i = 0; i < _fluxPipeQueue.size(); ++i) {
1501 FluxRec*& fluxRecQ = _fluxPipeQueue[i];
1502 _syncMap.fromXString( fluxRecQ->xString);
1503 QString fluxDataStrQ = _syncMap.valueString(
"data");
1504 if (rx.indexIn( fluxDataStrQ) >= 0) {
1507 _fluxRecPool += fluxRecQ;
1516 _fluxPipeQueue.enqueue( fluxRec);
1522 _fluxPipeQueue.enqueue( fluxRec);
1526 if (_isClosed)
return;
1528 bool isEchoBidirBlocked = itemNet->isOnlyEcho() && itemNet->isBiDirMode() &&
1529 !itemNet->isSyncFlux();
1530 bool isEchoMasterBlocked = !_isClientSide && itemNet->isMaster() && itemNet->isOnlyEcho() &&
1531 !itemNet->isSyncFlux();
1532 bool isRemAllowBlocked = !_remoteAllow.is( _allow.Write)
1533 && (_isClientSide || !isFreePath( itemNet->path()));
1534 if (isEchoBidirBlocked || isEchoMasterBlocked || isRemAllowBlocked) {
1535 itemNet->resetDirtyValue();
1539 itemNet->setQueueNum( ++_queueNumCount);
1540 _fluxItemQueue.enqueue( itemNet);
1549 void ArnSync::eventToFluxQue( uint netId,
int type,
const QByteArray& data)
1552 if (!_isConnectStarted)
return;
1555 FluxRec* fluxRec = getFreeFluxRec();
1558 _syncMap.add(
"id", QByteArray::number( netId));
1559 _syncMap.add(
"type", typeStr);
1560 _syncMap.add(
"data", data);
1561 fluxRec->xString += _syncMap.toXString();
1562 _fluxPipeQueue.enqueue( fluxRec);
1570 void ArnSync::atomicOpToFluxQue(
int op,
const QVariant& arg1,
const QVariant& arg2,
const ArnItemNet* itemNet)
1572 if (!itemNet)
return;
1573 if (!_isConnectStarted)
return;
1576 FluxRec* fluxRec = getFreeFluxRec();
1578 _syncMap.add(
ARNRECNAME,
"atomop").add(
"id", QByteArray::number( itemNet->netId()));
1579 _syncMap.add(
"op", opStr);
1581 _syncMap.add(
"a1", arg1.toString());
1583 _syncMap.add(
"a2", arg2.toString());
1584 fluxRec->xString += _syncMap.toXString();
1585 _fluxPipeQueue.enqueue( fluxRec);
1593 void ArnSync::destroyToFluxQue( ArnItemNet* itemNet)
1595 if (itemNet->isDisable())
return;
1596 if (!_isConnectStarted)
return;
1597 if (!_remoteAllow.is( _allow.Delete))
return;
1599 ArnLink::RetireType rt = ArnLink::RetireType::fromInt( itemNet->retireType());
1600 if ((rt == rt.Tree) || (rt == rt.None))
return;
1602 bool isGlobal = (rt == rt.LeafGlobal) || !_isClientSide;
1603 FluxRec* fluxRec = getFreeFluxRec();
1605 const char* delCmd = (_remoteVer[0] >= 2) ?
"delete" :
"destroy";
1606 _syncMap.add(
ARNRECNAME, isGlobal ? delCmd :
"nosync")
1607 .add(
"id", QByteArray::number( itemNet->netId()));
1608 fluxRec->xString += _syncMap.toXString();
1609 _fluxPipeQueue.enqueue( fluxRec);
1617 ArnSync::FluxRec* ArnSync::getFreeFluxRec()
1621 if (_fluxRecPool.empty()) {
1622 fluxRec =
new FluxRec;
1625 fluxRec = _fluxRecPool.takeLast();
1627 fluxRec->xString.resize(0);
1628 fluxRec->queueNum = ++_queueNumCount;
1634 void ArnSync::addToModeQue( ArnItemNet* itemNet)
1636 if (_isClosed)
return;
1637 if (!itemNet)
return;
1639 if (!_remoteAllow.is( _allow.ModeChange)
1640 && (_isClientSide || !isFreePath( itemNet->path())))
1642 itemNet->resetDirtyMode();
1646 _modeQueue.enqueue( itemNet);
1654 void ArnSync::sendNext()
1658 if (!_isConnected || !_socket->isValid())
return;
1659 if (_state != State::Normal) {
1665 ArnItemNet* itemNet;
1667 if (!_syncQueue.isEmpty()) {
1668 itemNet = _syncQueue.dequeue();
1669 sendSyncItem( itemNet);
1672 else if (!_modeQueue.isEmpty()) {
1673 itemNet = _modeQueue.dequeue();
1674 sendModeItem( itemNet);
1675 itemNet->resetDirtyMode();
1679 int itemQueueNum = _fluxItemQueue.isEmpty() ? _queueNumDone + INT_MAX : _fluxItemQueue.head()->queueNum();
1680 int pipeQueueNum = _fluxPipeQueue.isEmpty() ? _queueNumDone + INT_MAX : _fluxPipeQueue.head()->queueNum;
1681 int itemQueueRel = itemQueueNum - _queueNumDone;
1682 int pipeQueueRel = pipeQueueNum - _queueNumDone;
1684 if ((itemQueueRel < INT_MAX) || (pipeQueueRel < INT_MAX)) {
1685 if (itemQueueRel < pipeQueueRel) {
1686 _queueNumDone = itemQueueNum;
1688 itemNet = _fluxItemQueue.dequeue();
1689 sendFluxItem( itemNet);
1690 itemNet->resetDirtyValue();
1693 _queueNumDone = pipeQueueNum;
1695 FluxRec* fluxRec = _fluxPipeQueue.dequeue();
1696 _fluxRecPool += fluxRec;
1697 send( fluxRec->xString);
1709 QByteArray ArnSync::makeFluxString(
const ArnItemNet* itemNet,
const ArnLinkHandle& handleData,
1710 const QByteArray* valueData)
1713 if (itemNet->isSyncFlux()) type +=
"I";
1714 if (itemNet->isOnlyEcho()) type +=
"E";
1715 if (itemNet->isSaveFlux()) type +=
"S";
1719 _syncMap.add(
ARNRECNAME,
"flux").add(
"id", QByteArray::number( itemNet->netId()));
1721 if (!type.isEmpty())
1722 _syncMap.add(
"type", type);
1723 qint8 echoSeq = itemNet->echoSeq();
1725 _syncMap.addNum(
"es",
int(echoSeq));
1727 if (handleData.has( ArnLinkHandle::QueueFindRegexp))
1728 _syncMap.add(
"nqrx", handleData.valueRef( ArnLinkHandle::QueueFindRegexp).ARN_ToRegExp().pattern());
1729 else if (handleData.has( ArnLinkHandle::SeqNo))
1730 _syncMap.add(
"seq", QByteArray::number( handleData.valueRef( ArnLinkHandle::SeqNo).toInt()));
1732 _syncMap.add(
"data", valueData ? *valueData : itemNet->arnExport());
1734 return _syncMap.toXString();
1738 void ArnSync::sendFluxItem(
const ArnItemNet* itemNet)
1740 if (!itemNet || !itemNet->isOpen()) {
1745 send( makeFluxString( itemNet, ArnLinkHandle::null(), arnNullptr));
1749 void ArnSync::sendSyncItem( ArnItemNet* itemNet)
1751 if (!itemNet || !itemNet->isOpen()) {
1758 _syncMap.add(
"path", (*_toRemotePathCB)( _sessionHandler, itemNet->path()));
1759 _syncMap.add(
"id", QByteArray::number( itemNet->netId()));
1760 QByteArray smode = itemNet->getSyncModeString();
1761 if (!smode.isEmpty()) {
1762 _syncMap.add(
"smode", smode);
1766 <<
", " << _syncMap.toXString();
1767 sendXSMap( _syncMap);
1771 void ArnSync::sendModeItem( ArnItemNet* itemNet)
1773 if (!itemNet || !itemNet->isOpen()) {
1780 _syncMap.add(
"id", QByteArray::number( itemNet->netId()));
1781 _syncMap.add(
"data", itemNet->getModeString());
1782 sendXSMap( _syncMap);
1786 void ArnSync::customEvent( QEvent* ev)
1796 ArnItemNet* itemNet =
static_cast<ArnItemNet*
>(
static_cast<ArnBasicItem*
>( e->
target()));
1797 if (!itemNet)
break;
1799 quint32 sendId = e->
sendId();
1800 bool isBlocked = itemNet->isBlock( sendId);
1806 itemNet->addIsOnlyEcho( sendId);
1807 if (_isClientSide) {
1808 itemNet->nextEchoSeq();
1809 itemNet->setSyncFlux(
false);
1811 else if (!itemNet->isOnlyEcho()) {
1812 itemNet->resetEchoSeq();
1813 itemNet->setSyncFlux(
false);
1814 itemNet->setSaveFlux( e->
handleData().flags().is( ArnLinkHandle::Flags::FromPersist));
1822 ArnItemNet* itemNet =
static_cast<ArnItemNet*
>(
static_cast<ArnBasicItem*
>( e->
target()));
1823 if (!itemNet)
break;
1827 atomicOpToFluxQue( e->
op(), e->
arg1(), e->
arg2(), itemNet);
1833 ArnItemNet* itemNet =
static_cast<ArnItemNet*
>(
static_cast<ArnBasicItem*
>( e->
target()));
1834 if (!itemNet)
break;
1838 if (!itemNet->isFolder())
1839 itemModeUpdater( itemNet);
1845 ArnItemNet* itemNet =
static_cast<ArnItemNet*
>(
static_cast<ArnBasicItem*
>( e->
target()));
1846 if (!itemNet)
break;
1858 ArnItemNet* itemNet =
static_cast<ArnItemNet*
>(
static_cast<ArnBasicItem*
>( e->
target()));
1859 if (!itemNet)
break;
1861 if (itemNet->isMonitor()) {
1862 QString destroyPath = e->
isBelow() ? e->
startLink()->linkPath() : itemNet->path();
1868 if (
Arn::debugLinkDestroy) qDebug() <<
"itemRemove: netId=" << itemNet->netId() <<
" path=" << itemNet->path();
1869 removeItemNetRefs( itemNet);
1870 destroyToFluxQue( itemNet);
1880 ArnBasicItemEventHandler::defaultEvent( ev);
static int baseType(int setVal=-1)
bool fromXString(const QByteArray &inXString, int size=-1)
static bool isFolder(const QString &path)
Monitor of server object for client.
const QByteArray * valueData() const
const QByteArray & data() const
Container class with string representation for serialized data.
const QVariant & arg1() const
Arn::DataType type() const
The type stored in the Arn Data Object
QString fullPath(const QString &path)
Convert a path to a full absolute path.
Value for Server, can not be set in Client.
Assigning same value generates an update of the Arn Data Object
The Client session Sync mode at connect & reconnect.
const QString pathLocalSys
XStringMap & addValues(const QStringList &stringList)
Explicit permanent Master mode, typically an observer or manually setup Master mode.
QByteArray arnExport() const
static QStringList items(const QString &path)
Get the childrens of the folder at path
const ArnLinkHandle & handleData() const
QString makePath(const QString &parentPath, const QString &itemName)
Make a path from a parent and an item name.
XStringMap & addNum(const char *key, int val)
An Arn object was deleted.
bool open(const QString &path)
Open a handle to an Arn Data Object
Link flags when accessing an Arn Data Object
ArnLink * startLink() const
QByteArray toXString() const
void * sessionHandler() const
const QVariant & arg2() const
First local write gives permanent Master mode, typically a client value reporter. ...
Internal: start the Monitor.
static void errorLog(QString errText, ArnError err=ArnError::Undef, void *reference=arnNullptr)
Internal: restart the Monitor.
XStringMap & add(const char *key, const QByteArray &val)
Default dynamic auto master mode, general purpose, prohibit Null value sync.
void setTarget(void *target)
Handle for an Arn Data Object.
Base class handle for an Arn Data Object.