2
0
mirror of https://github.com/kotatogram/kotatogram-desktop synced 2025-08-31 06:35:14 +00:00

Move some things from SessionData.

This commit is contained in:
John Preston
2019-11-20 13:41:14 +03:00
parent 885738ac32
commit bdc7f4114f
8 changed files with 280 additions and 355 deletions

View File

@@ -39,6 +39,7 @@ constexpr auto kPingSendAfterForce = 45 * crl::time(1000);
constexpr auto kTemporaryExpiresIn = TimeId(10);
constexpr auto kBindKeyAdditionalExpiresTimeout = TimeId(30);
constexpr auto kTestModeDcIdShift = 10000;
constexpr auto kCheckSentRequestsEach = 1 * crl::time(1000);
// If we can't connect for this time we will ask _instance to update config.
constexpr auto kRequestConfigTimeout = 8 * crl::time(1000);
@@ -46,9 +47,19 @@ constexpr auto kRequestConfigTimeout = 8 * crl::time(1000);
// Don't try to handle messages larger than this size.
constexpr auto kMaxMessageLength = 16 * 1024 * 1024;
// How much time passed from send till we resend request or check its state.
constexpr auto kCheckSentRequestTimeout = 10 * crl::time(1000);
// How much time to wait for some more requests,
// when resending request or checking its state.
constexpr auto kSendStateRequestWaiting = crl::time(1000);
// Container lives 10 minutes in haveSent map.
constexpr auto kContainerLives = TimeId(600);
using namespace details;
QString LogIdsVector(const QVector<MTPlong> &ids) {
[[nodiscard]] QString LogIdsVector(const QVector<MTPlong> &ids) {
if (!ids.size()) return "[]";
auto idsStr = QString("[%1").arg(ids.cbegin()->v);
for (const auto &id : ids) {
@@ -57,6 +68,15 @@ QString LogIdsVector(const QVector<MTPlong> &ids) {
return idsStr + "]";
}
[[nodiscard]] QString LogIds(const QVector<uint64> &ids) {
if (!ids.size()) return "[]";
auto idsStr = QString("[%1").arg(*ids.cbegin());
for (const auto id : ids) {
idsStr += QString(", %2").arg(id);
}
return idsStr + "]";
}
void wrapInvokeAfter(SecureRequest &to, const SecureRequest &from, const RequestMap &haveSent, int32 skipBeforeRequest = 0) {
const auto afterId = *(mtpMsgId*)(from->after->data() + 4);
const auto i = afterId ? haveSent.constFind(afterId) : haveSent.cend();
@@ -200,6 +220,47 @@ int16 ConnectionPrivate::getProtocolDcId() const {
: testedDcId;
}
void ConnectionPrivate::checkSentRequests() {
QVector<mtpMsgId> removingIds; // remove very old (10 minutes) containers and resend requests
auto requesting = false;
{
QReadLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
const auto haveSentCount = haveSent.size();
auto ms = crl::now();
for (auto i = haveSent.begin(), e = haveSent.end(); i != e; ++i) {
auto &req = i.value();
if (req->msDate > 0) {
if (req->msDate + kCheckSentRequestTimeout < ms) {
// Need to check state.
req->msDate = ms;
if (_stateRequestData.emplace(i.key()).second) {
requesting = true;
}
}
} else if (base::unixtime::now()
> int32(i.key() >> 32) + kContainerLives) {
removingIds.reserve(haveSentCount);
removingIds.push_back(i.key());
}
}
}
if (requesting) {
_sessionData->queueSendAnything(kSendStateRequestWaiting);
}
if (!removingIds.isEmpty()) {
QWriteLocker locker(_sessionData->haveSentMutex());
auto &haveSent = _sessionData->haveSentMap();
for (uint32 i = 0, l = removingIds.size(); i < l; ++i) {
auto j = haveSent.find(removingIds[i]);
if (j != haveSent.cend()) {
Assert(!j.value()->requestId);
haveSent.erase(j);
}
}
}
}
void ConnectionPrivate::destroyAllConnections() {
clearUnboundKeyCreator();
_waitForBetterTimer.cancel();
@@ -228,12 +289,16 @@ ConnectionPrivate::ConnectionPrivate(
, _waitForReceived(kMinReceiveTimeout)
, _waitForConnected(kMinConnectedTimeout)
, _pingSender(thread, [=] { sendPingByTimer(); })
, _checkSentRequestsTimer(thread, [=] { checkSentRequests(); })
, _sessionData(std::move(data)) {
Expects(_shiftedDcId != 0);
moveToThread(thread);
connect(thread, &QThread::started, this, [=] { connectToServer(); });
connect(thread, &QThread::started, this, [=] {
_checkSentRequestsTimer.callEach(kCheckSentRequestsEach);
connectToServer();
});
connect(thread, &QThread::finished, this, [=] { finishAndDestroy(); });
connect(_sessionData->owner(), SIGNAL(authKeyChanged()), this, SLOT(updateAuthKey()), Qt::QueuedConnection);
@@ -315,19 +380,42 @@ void ConnectionPrivate::resetSession() {
_needSessionReset = false;
DEBUG_LOG(("MTP Info: creating new session in resetSession."));
_sessionData->changeSessionId();
// #TODO move to sessionData, clear on changeSessionIdLocked.
_ackRequestData.clear();
_resendRequestData.clear();
{
QWriteLocker locker5(_sessionData->stateRequestMutex());
_sessionData->stateRequestMap().clear();
}
changeSessionId();
_sessionData->queueResetDone();
}
void ConnectionPrivate::changeSessionId() {
auto sessionId = _sessionId;
do {
sessionId = openssl::RandomValue<uint64>();
} while (_sessionId == sessionId);
DEBUG_LOG(("MTP Info: setting server_session: %1").arg(sessionId));
_sessionId = sessionId;
_messagesCounter = 0;
_sessionMarkedAsStarted = false;
_ackRequestData.clear();
_resendRequestData.clear();
_stateRequestData.clear();
_receivedMessageIds.clear();
}
uint32 ConnectionPrivate::nextRequestSeqNumber(bool needAck) {
const auto result = _messagesCounter;
_messagesCounter += (needAck ? 1 : 0);
return result * 2 + (needAck ? 1 : 0);
}
bool ConnectionPrivate::markSessionAsStarted() {
if (_sessionMarkedAsStarted) {
return false;
}
_sessionMarkedAsStarted = true;
return true;
}
mtpMsgId ConnectionPrivate::prepareToSend(
SecureRequest &request,
mtpMsgId currentLastId,
@@ -349,7 +437,7 @@ mtpMsgId ConnectionPrivate::prepareToSend(
: msgId;
}
request.setMsgId(currentLastId);
request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck()));
request.setSeqNo(nextRequestSeqNumber(request.needAck()));
if (request->requestId) {
MTP_LOG(_shiftedDcId, ("[r%1] msg_id 0 -> %2").arg(request->requestId).arg(currentLastId));
}
@@ -412,7 +500,7 @@ mtpMsgId ConnectionPrivate::replaceMsgId(SecureRequest &request, mtpMsgId newId)
}
request.setMsgId(newId);
request.setSeqNo(_sessionData->nextRequestSeqNumber(request.needAck()));
request.setSeqNo(nextRequestSeqNumber(request.needAck()));
return newId;
}
@@ -455,8 +543,7 @@ void ConnectionPrivate::tryToSend() {
&& _pingSendAt <= crl::now()) {
_pingIdToSend = openssl::RandomValue<mtpPingId>();
}
const auto forceNewMsgId = sendAll
&& _sessionData->markSessionAsStarted();
const auto forceNewMsgId = sendAll && markSessionAsStarted();
if (forceNewMsgId && _keyCreator) {
_keyCreator->restartBinder();
}
@@ -504,22 +591,14 @@ void ConnectionPrivate::tryToSend() {
MTP_msg_resend_req(MTP_vector<MTPlong>(
base::take(_resendRequestData)))));
}
auto stateReq = QVector<MTPlong>();
{
QWriteLocker locker(_sessionData->stateRequestMutex());
auto &ids = _sessionData->stateRequestMap();
if (!ids.isEmpty()) {
stateReq.reserve(ids.size());
for (auto i = ids.cbegin(), e = ids.cend(); i != e; ++i) {
stateReq.push_back(MTP_long(i.key()));
}
if (!_stateRequestData.empty()) {
auto ids = QVector<MTPlong>();
ids.reserve(_stateRequestData.size());
for (const auto id : base::take(_stateRequestData)) {
ids.push_back(MTP_long(id));
}
ids.clear();
}
if (!stateReq.isEmpty()) {
stateRequest = SecureRequest::Serialize(MTPMsgsStateReq(
MTP_msgs_state_req(MTP_vector<MTPlong>(stateReq))));
MTP_msgs_state_req(MTP_vector<MTPlong>(ids))));
// Add to haveSent / wereAcked maps, but don't add to requestMap.
stateRequest->requestId = GetNextRequestId();
}
@@ -530,14 +609,13 @@ void ConnectionPrivate::tryToSend() {
if (_keyCreator && _keyCreator->bindReadyToRequest()) {
bindDcKeyRequest = _keyCreator->prepareBindRequest(
_temporaryKey,
_sessionData->getSessionId());
_sessionId);
// This is a special request with msgId used inside the message
// body, so it is prepared already with a msgId and we place
// seqNo for it manually here.
bindDcKeyRequest.setSeqNo(
_sessionData->nextRequestSeqNumber(
bindDcKeyRequest.needAck()));
nextRequestSeqNumber(bindDcKeyRequest.needAck()));
//} else if (!_keyChecker) {
// if (const auto &keyForCheck = _sessionData->getKeyForCheck()) {
// _keyChecker = std::make_unique<details::DcKeyChecker>(
@@ -546,14 +624,13 @@ void ConnectionPrivate::tryToSend() {
// keyForCheck);
// bindDcKeyRequest = _keyChecker->prepareRequest(
// _temporaryKey,
// _sessionData->getSessionId());
// _sessionId);
// // This is a special request with msgId used inside the message
// // body, so it is prepared already with a msgId and we place
// // seqNo for it manually here.
// bindDcKeyRequest.setSeqNo(
// _sessionData->nextRequestSeqNumber(
// bindDcKeyRequest.needAck()));
// nextRequestSeqNumber(bindDcKeyRequest.needAck()));
// }
}
}
@@ -1229,10 +1306,9 @@ void ConnectionPrivate::handleReceived() {
TCP_LOG(("TCP Info: decrypted message %1,%2,%3 is %4 len").arg(msgId).arg(seqNo).arg(Logs::b(needAck)).arg(fullDataLength));
uint64 serverSession = _sessionData->getSessionId();
if (session != serverSession) {
if (session != _sessionId) {
LOG(("MTP Error: bad server session received"));
TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(serverSession));
TCP_LOG(("MTP Error: bad server session %1 instead of %2 in message received").arg(session).arg(_sessionId));
return restart();
}
@@ -1247,23 +1323,22 @@ void ConnectionPrivate::handleReceived() {
}
bool badTime = false;
uint64 mySalt = _sessionData->getSalt();
if (serverTime > clientTime + 60 || serverTime + 300 < clientTime) {
DEBUG_LOG(("MTP Info: bad server time from msg_id: %1, my time: %2").arg(serverTime).arg(clientTime));
badTime = true;
}
bool wasConnected = (getState() == ConnectedState);
if (serverSalt != mySalt) {
if (serverSalt != _sessionSalt) {
if (!badTime) {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(mySalt));
_sessionData->setSalt(serverSalt);
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2, updating...").arg(serverSalt).arg(_sessionSalt));
_sessionSalt = serverSalt;
if (setState(ConnectedState, ConnectingState)) {
_sessionData->resendAll();
}
} else {
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(mySalt));
DEBUG_LOG(("MTP Info: other salt received... received: %1, my salt: %2").arg(serverSalt).arg(_sessionSalt));
}
} else {
serverSalt = 0; // dont pass to handle method, so not to lock in setSalt()
@@ -1277,18 +1352,10 @@ void ConnectionPrivate::handleReceived() {
auto sfrom = decryptedInts + 4U; // msg_id + seq_no + length + message
MTP_LOG(_shiftedDcId, ("Recv: ") + details::DumpToText(sfrom, end) + QString(" (keyId:%1)").arg(_temporaryKey->keyId()));
bool needToHandle = false;
{
QWriteLocker lock(_sessionData->receivedIdsMutex());
needToHandle = _sessionData->receivedIdsSet().registerMsgId(msgId, needAck);
}
if (needToHandle) {
if (_receivedMessageIds.registerMsgId(msgId, needAck)) {
res = handleOneReceived(from, end, msgId, serverTime, serverSalt, badTime);
}
{
QWriteLocker lock(_sessionData->receivedIdsMutex());
_sessionData->receivedIdsSet().shrink();
}
_receivedMessageIds.shrink();
// send acks
if (const auto toAckSize = _ackRequestData.size()) {
@@ -1389,13 +1456,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return HandleResult::ParseError;
}
bool needToHandle = false;
{
QWriteLocker lock(_sessionData->receivedIdsMutex());
needToHandle = _sessionData->receivedIdsSet().registerMsgId(inMsgId.v, needAck);
}
auto res = HandleResult::Success; // if no need to handle, then succeed
if (needToHandle) {
if (_receivedMessageIds.registerMsgId(inMsgId.v, needAck)) {
res = handleOneReceived(from, otherEnd, inMsgId.v, serverTime, serverSalt, badTime);
badTime = false;
}
@@ -1489,7 +1551,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
}
if (needResend) { // bad msg_id or bad container
if (serverSalt) _sessionData->setSalt(serverSalt);
if (serverSalt) {
_sessionSalt = serverSalt;
}
base::unixtime::update(serverTime, true);
DEBUG_LOG(("Message Info: unixtime updated, now %1, resending in container...").arg(serverTime));
@@ -1497,7 +1561,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
resend(resendId, 0, true);
} else { // must create new session, because msg_id and msg_seqno are inconsistent
if (badTime) {
if (serverSalt) _sessionData->setSalt(serverSalt);
if (serverSalt) {
_sessionSalt = serverSalt;
}
base::unixtime::update(serverTime, true);
badTime = false;
}
@@ -1543,8 +1609,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return (badTime ? HandleResult::Ignored : HandleResult::Success);
}
uint64 serverSalt = data.vnew_server_salt().v;
_sessionData->setSalt(serverSalt);
_sessionSalt = data.vnew_server_salt().v;
base::unixtime::update(serverTime);
if (setState(ConnectedState, ConnectingState)) {
@@ -1573,10 +1638,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
QByteArray info(idsCount, Qt::Uninitialized);
{
QReadLocker lock(_sessionData->receivedIdsMutex());
auto &receivedIds = _sessionData->receivedIdsSet();
auto minRecv = receivedIds.min();
auto maxRecv = receivedIds.max();
const auto minRecv = _receivedMessageIds.min();
const auto maxRecv = _receivedMessageIds.max();
QReadLocker locker(_sessionData->wereAckedMutex());
const auto &wereAcked = _sessionData->wereAckedMap();
@@ -1590,15 +1653,15 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
} else if (reqMsgId > maxRecv) {
state |= 0x03;
} else {
auto msgIdState = receivedIds.lookup(reqMsgId);
if (msgIdState == ReceivedMsgIds::State::NotFound) {
auto msgIdState = _receivedMessageIds.lookup(reqMsgId);
if (msgIdState == ReceivedIdsManager::State::NotFound) {
state |= 0x02;
} else {
state |= 0x04;
if (wereAcked.constFind(reqMsgId) != wereAckedEnd) {
state |= 0x80; // we know, that server knows, that we received request
}
if (msgIdState == ReceivedMsgIds::State::NeedsAck) { // need ack, so we sent ack
if (msgIdState == ReceivedIdsManager::State::NeedsAck) { // need ack, so we sent ack
state |= 0x08;
} else {
state |= 0x10;
@@ -1632,7 +1695,9 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return (badTime ? HandleResult::Ignored : HandleResult::Success);
}
if (badTime) {
if (serverSalt) _sessionData->setSalt(serverSalt); // requestsFixTimeSalt with no lookup
if (serverSalt) {
_sessionSalt = serverSalt; // requestsFixTimeSalt with no lookup
}
base::unixtime::update(serverTime, true);
DEBUG_LOG(("Message Info: unixtime updated from mtpc_msgs_state_info, now %1").arg(serverTime));
@@ -1710,13 +1775,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
}
requestsAcked(ids);
bool received = false;
MTPlong resMsgId = data.vanswer_msg_id();
{
QReadLocker lock(_sessionData->receivedIdsMutex());
received = (_sessionData->receivedIdsSet().lookup(resMsgId.v) != ReceivedMsgIds::State::NotFound);
}
if (received) {
const auto resMsgId = data.vanswer_msg_id();
if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
_ackRequestData.push_back(resMsgId);
} else {
DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
@@ -1737,13 +1797,8 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
DEBUG_LOG(("Message Info: msg new detailed info, answerId %2, status %3, bytes %4").arg(data.vanswer_msg_id().v).arg(data.vstatus().v).arg(data.vbytes().v));
bool received = false;
MTPlong resMsgId = data.vanswer_msg_id();
{
QReadLocker lock(_sessionData->receivedIdsMutex());
received = (_sessionData->receivedIdsSet().lookup(resMsgId.v) != ReceivedMsgIds::State::NotFound);
}
if (received) {
const auto resMsgId = data.vanswer_msg_id();
if (_receivedMessageIds.lookup(resMsgId.v) != ReceivedIdsManager::State::NotFound) {
_ackRequestData.push_back(resMsgId);
} else {
DEBUG_LOG(("Message Info: answer message %1 was not received, requesting...").arg(resMsgId.v));
@@ -1847,7 +1902,7 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
}
DEBUG_LOG(("Message Info: new server session created, unique_id %1, first_msg_id %2, server_salt %3").arg(data.vunique_id().v).arg(data.vfirst_msg_id().v).arg(data.vserver_salt().v));
_sessionData->setSalt(data.vserver_salt().v);
_sessionSalt = data.vserver_salt().v;
mtpMsgId firstMsgId = data.vfirst_msg_id().v;
QVector<quint64> toResend;
@@ -1993,7 +2048,9 @@ bool ConnectionPrivate::requestsFixTimeSalt(const QVector<MTPlong> &ids, int32 s
for (uint32 i = 0; i < idsCount; ++i) {
if (wasSent(ids[i].v)) {// found such msg_id in recent acked requests or in recent sent requests
if (serverSalt) _sessionData->setSalt(serverSalt);
if (serverSalt) {
_sessionSalt = serverSalt;
}
base::unixtime::update(serverTime, true);
return true;
}
@@ -2264,6 +2321,16 @@ void ConnectionPrivate::updateAuthKey() {
applyAuthKey(_sessionData->getTemporaryKey());
}
void ConnectionPrivate::setCurrentKeyId(uint64 newKeyId) {
if (_keyId == newKeyId) {
return;
}
_keyId = newKeyId;
DEBUG_LOG(("MTP Info: auth key id set to id %1").arg(newKeyId));
changeSessionId();
}
void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) {
_temporaryKey = std::move(temporaryKey);
const auto newKeyId = _temporaryKey ? _temporaryKey->keyId() : 0;
@@ -2271,15 +2338,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) {
if (_keyId == newKeyId) {
return;
}
_keyId = 0;
if (_sessionData->setCurrentKeyId(_keyId)) {
_ackRequestData.clear(); // #TODO move to sessionData.
_resendRequestData.clear();
{
QWriteLocker locker5(_sessionData->stateRequestMutex());
_sessionData->stateRequestMap().clear();
}
}
setCurrentKeyId(0);
DEBUG_LOG(("MTP Error: auth_key id for dc %1 changed, restarting..."
).arg(_shiftedDcId));
if (_connection) {
@@ -2290,15 +2349,7 @@ void ConnectionPrivate::applyAuthKey(AuthKeyPtr &&temporaryKey) {
if (!_connection) {
return;
}
if (newKeyId && _sessionData->setCurrentKeyId(newKeyId)) {
_ackRequestData.clear(); // #TODO move to sessionData.
_resendRequestData.clear();
{
QWriteLocker locker5(_sessionData->stateRequestMutex());
_sessionData->stateRequestMap().clear();
}
}
_keyId = newKeyId;
setCurrentKeyId(newKeyId);
Assert(!_connection->sentEncryptedWithKeyId());
DEBUG_LOG(("AuthKey Info: Connection update key from Session, dc %1 result: %2").arg(_shiftedDcId).arg(Logs::mb(&_keyId, sizeof(_keyId)).str()));
@@ -2355,7 +2406,7 @@ void ConnectionPrivate::tryAcquireKeyCreation() {
).arg(result->temporaryServerSalt
).arg(result->persistentServerSalt));
_sessionData->setSalt(result->temporaryServerSalt);
_sessionSalt = result->temporaryServerSalt;
if (result->persistentKey) {
_sessionData->clearForNewKey(_instance);
}
@@ -2395,7 +2446,7 @@ void ConnectionPrivate::authKeyChecked() {
handleReceived();
});
if (_sessionData->getSalt() && setState(ConnectedState)) {
if (_sessionSalt && setState(ConnectedState)) {
_sessionData->resendAll();
} // else receive salt in bad_server_salt first, then try to send all the requests
@@ -2469,11 +2520,8 @@ bool ConnectionPrivate::sendSecureRequest(
return false;
}
auto session = _sessionData->getSessionId();
auto salt = _sessionData->getSalt();
memcpy(request->data() + 0, &salt, 2 * sizeof(mtpPrime));
memcpy(request->data() + 2, &session, 2 * sizeof(mtpPrime));
memcpy(request->data() + 0, &_sessionSalt, 2 * sizeof(mtpPrime));
memcpy(request->data() + 2, &_sessionId, 2 * sizeof(mtpPrime));
auto from = request->constData() + 4;
MTP_LOG(_shiftedDcId, ("Send: ") + details::DumpToText(from, from + messageSize) + QString(" (keyId:%1)").arg(_temporaryKey->keyId()));