2
0
mirror of https://github.com/kotatogram/kotatogram-desktop synced 2025-08-31 06:35:14 +00:00

added lock for dcOptions, emojibox large emoji display, session management improved, new emoji dropdown started

This commit is contained in:
John Preston
2015-05-14 19:50:04 +03:00
parent d92356ce28
commit 136fd5c8e1
18 changed files with 378 additions and 244 deletions

View File

@@ -23,13 +23,14 @@ Copyright (c) 2014 John Preston, https://desktop.telegram.org
namespace {
typedef QMap<int32, MTProtoSessionPtr> Sessions;
Sessions sessions;
QVector<MTProtoSessionPtr> sessionsToKill;
MTProtoSessionPtr mainSession;
typedef QMap<mtpRequestId, int32> RequestsByDC; // holds dc for request to this dc or -dc for request to main dc
typedef QMap<mtpRequestId, int32> RequestsByDC; // holds dcWithShift for request to this dc or -dc for request to main dc
RequestsByDC requestsByDC;
QMutex requestByDCLock;
typedef QMap<mtpRequestId, int32> AuthExportRequests; // holds target dc for auth export request
typedef QMap<mtpRequestId, int32> AuthExportRequests; // holds target dcWithShift for auth export request
AuthExportRequests authExportRequests;
bool _started = false;
@@ -55,7 +56,7 @@ namespace {
BadGuestDCRequests badGuestDCRequests;
typedef QVector<mtpRequestId> DCAuthWaiters;
typedef QMap<int32, DCAuthWaiters> AuthWaiters;
typedef QMap<int32, DCAuthWaiters> AuthWaiters; // holds request ids waiting for auth import to specific dc
AuthWaiters authWaiters;
QMutex toClearLock;
@@ -76,12 +77,11 @@ namespace {
if (globalHandler.onFail && MTP::authedId()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return;
}
int32 newdc = i.value();
int32 newdc = i.value() % _mtp_internal::dcShift;
DEBUG_LOG(("MTP Info: auth import to dc %1 succeeded").arg(newdc));
DCAuthWaiters &waiters(authWaiters[newdc]);
MTProtoSessionPtr session(_mtp_internal::getSession(newdc));
if (waiters.size()) {
QReadLocker locker(&requestMapLock);
for (DCAuthWaiters::iterator i = waiters.begin(), e = waiters.end(); i != e; ++i) {
@@ -91,6 +91,7 @@ namespace {
LOG(("MTP Error: could not find request %1 for resending").arg(requestId));
continue;
}
int32 dcWithShift = newdc;
{
RequestsByDC::iterator k = requestsByDC.find(requestId);
if (k == requestsByDC.cend()) {
@@ -101,11 +102,15 @@ namespace {
MTP::setdc(newdc);
k.value() = -newdc;
} else {
k.value() = k.value() - (k.value() % _mtp_internal::dcShift) + newdc;
int32 shift = k.value() - (k.value() % _mtp_internal::dcShift);
dcWithShift += shift;
k.value() = dcWithShift;
}
DEBUG_LOG(("MTP Info: resending request %1 to dc %2 after import auth").arg(requestId).arg(k.value()));
}
session->sendPrepared(j.value());
if (MTProtoSessionPtr session = _mtp_internal::getSession(dcWithShift)) {
session->sendPrepared(j.value());
}
}
waiters.clear();
}
@@ -121,8 +126,8 @@ namespace {
void exportDone(const MTPauth_ExportedAuthorization &result, mtpRequestId req) {
AuthExportRequests::const_iterator i = authExportRequests.constFind(req);
if (i == authExportRequests.cend()) {
LOG(("MTP Error: auth export request target dc not found, requestId: %1").arg(req));
RPCError error(rpcClientError("AUTH_IMPORT_FAIL", QString("did not find target dc, request %1").arg(req)));
LOG(("MTP Error: auth export request target dcWithShift not found, requestId: %1").arg(req));
RPCError error(rpcClientError("AUTH_IMPORT_FAIL", QString("did not find target dcWithShift, request %1").arg(req)));
if (globalHandler.onFail && MTP::authedId()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return;
}
@@ -137,7 +142,7 @@ namespace {
AuthExportRequests::const_iterator i = authExportRequests.constFind(req);
if (i != authExportRequests.cend()) {
authWaiters[i.value()].clear();
authWaiters[i.value() % _mtp_internal::dcShift].clear();
}
if (globalHandler.onFail && MTP::authedId()) (*globalHandler.onFail)(req, error); // auth failed in main dc
return true;
@@ -151,31 +156,34 @@ namespace {
if ((m = QRegularExpression("^(FILE|PHONE|NETWORK|USER)_MIGRATE_(\\d+)$").match(err)).hasMatch()) {
if (!requestId) return false;
int32 dc = 0, newdc = m.captured(2).toInt();
int32 dcWithShift = 0, newdcWithShift = m.captured(2).toInt();
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i == requestsByDC.end()) {
LOG(("MTP Error: could not find request %1 for migrating to %2").arg(requestId).arg(newdc));
LOG(("MTP Error: could not find request %1 for migrating to %2").arg(requestId).arg(newdcWithShift));
} else {
dc = i.value();
dcWithShift = i.value();
}
}
if (!dc || !newdc) return false;
if (!dcWithShift || !newdcWithShift) return false;
DEBUG_LOG(("MTP Info: changing request %1 dc%2 to %3").arg(requestId).arg((dc > 0) ? "" : " and main dc").arg(newdc));
if (dc < 0) {
if (MTP::authedId() && !authExportRequests.contains(requestId)) { // import auth, set dc and resend
DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(newdc));
DCAuthWaiters &waiters(authWaiters[newdc]);
DEBUG_LOG(("MTP Info: changing request %1 from dcWithShift%2 to dc%3").arg(requestId).arg(dcWithShift).arg(newdcWithShift));
if (dcWithShift < 0) { // newdc shift = 0
if (false && MTP::authedId() && !authExportRequests.contains(requestId)) { // migrate not supported at this moment
DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(newdcWithShift));
DCAuthWaiters &waiters(authWaiters[newdcWithShift]);
if (!waiters.size()) {
authExportRequests.insert(MTP::send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone(exportDone), rpcFail(exportFail)), newdc);
authExportRequests.insert(MTP::send(MTPauth_ExportAuthorization(MTP_int(newdcWithShift)), rpcDone(exportDone), rpcFail(exportFail)), newdcWithShift);
}
waiters.push_back(requestId);
return true;
} else {
MTP::setdc(newdc);
MTP::setdc(newdcWithShift);
}
} else {
int32 shift = dcWithShift - (dcWithShift % _mtp_internal::dcShift);
newdcWithShift += shift;
}
mtpRequest req;
@@ -188,8 +196,10 @@ namespace {
}
req = i.value();
}
_mtp_internal::registerRequest(requestId, (dc < 0) ? -newdc : newdc);
_mtp_internal::getSession(newdc)->sendPrepared(req);
if (MTProtoSessionPtr session = _mtp_internal::getSession(newdcWithShift)) {
_mtp_internal::registerRequest(requestId, (dcWithShift < 0) ? -newdcWithShift : newdcWithShift);
session->sendPrepared(req);
}
return true;
} else if (code < 0 || code >= 500 || (m = QRegularExpression("^FLOOD_WAIT_(\\d+)$").match(err)).hasMatch()) {
if (!requestId) return false;
@@ -218,26 +228,26 @@ namespace {
return true;
} else if (code == 401 || (badGuestDC && badGuestDCRequests.constFind(requestId) == badGuestDCRequests.cend())) {
int32 dc = 0;
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
dc = i.value();
dcWithShift = i.value();
} else {
LOG(("MTP Error: unauthorized request without dc info, requestId %1").arg(requestId));
}
}
int32 newdc = abs(dc) % _mtp_internal::dcShift;
int32 newdc = abs(dcWithShift) % _mtp_internal::dcShift;
if (!newdc || newdc == mtpMainDC() || !MTP::authedId()) {
if (!badGuestDC && globalHandler.onFail) (*globalHandler.onFail)(requestId, error); // auth failed in main dc
return false;
}
DEBUG_LOG(("MTP Info: importing auth to dc %1").arg(dc));
DEBUG_LOG(("MTP Info: importing auth to dcWithShift %1").arg(dcWithShift));
DCAuthWaiters &waiters(authWaiters[newdc]);
if (!waiters.size()) {
authExportRequests.insert(MTP::send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone(exportDone), rpcFail(exportFail)), newdc);
authExportRequests.insert(MTP::send(MTPauth_ExportAuthorization(MTP_int(newdc)), rpcDone(exportDone), rpcFail(exportFail)), abs(dcWithShift));
}
waiters.push_back(requestId);
if (badGuestDC) badGuestDCRequests.insert(requestId);
@@ -253,20 +263,22 @@ namespace {
}
req = i.value();
}
int32 dc = 0;
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i == requestsByDC.end()) {
LOG(("MTP Error: could not find request %1 for resending with init connection").arg(requestId));
} else {
dc = i.value();
dcWithShift = i.value();
}
}
if (!dc) return false;
if (!dcWithShift) return false;
req->needsLayer = true;
_mtp_internal::getSession(dc < 0 ? (-dc) : dc)->sendPrepared(req);
if (MTProtoSessionPtr session = _mtp_internal::getSession(dcWithShift < 0 ? (-dcWithShift) : dcWithShift)) {
req->needsLayer = true;
session->sendPrepared(req);
}
return true;
} else if (err == qsl("MSG_WAIT_FAILED")) {
mtpRequest req;
@@ -283,7 +295,7 @@ namespace {
LOG(("MTP Error: wait failed for not dependent request %1").arg(requestId));
return false;
}
int32 dc = 0;
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId), j = requestsByDC.find(req->after->requestId);
@@ -292,19 +304,21 @@ namespace {
} else if (j == requestsByDC.end()) {
LOG(("MTP Error: could not find dependent request %1 by dc").arg(req->after->requestId));
} else {
dc = i.value();
dcWithShift = i.value();
if (i.value() != j.value()) {
req->after = mtpRequest();
}
}
}
if (!dc) return false;
if (!dcWithShift) return false;
if (!req->after) {
req->needsLayer = true;
_mtp_internal::getSession(dc < 0 ? (-dc) : dc)->sendPrepared(req);
if (MTProtoSessionPtr session = _mtp_internal::getSession(dcWithShift < 0 ? (-dcWithShift) : dcWithShift)) {
req->needsLayer = true;
session->sendPrepared(req);
}
} else {
int32 newdc = abs(dc) % _mtp_internal::dcShift;
int32 newdc = abs(dcWithShift) % _mtp_internal::dcShift;
DCAuthWaiters &waiters(authWaiters[newdc]);
if (waiters.indexOf(req->after->requestId) >= 0) {
if (waiters.indexOf(requestId) < 0) {
@@ -338,27 +352,27 @@ namespace {
}
namespace _mtp_internal {
MTProtoSessionPtr getSession(int32 dc) {
MTProtoSessionPtr getSession(int32 dcWithShift) {
if (!_started) return MTProtoSessionPtr();
if (!dc) return mainSession;
if (!(dc % _mtp_internal::dcShift)) {
dc += mainSession->getDC();
if (!dcWithShift) return mainSession;
if (!(dcWithShift % _mtp_internal::dcShift)) {
dcWithShift += (mainSession->getDcWithShift() % _mtp_internal::dcShift);
}
Sessions::const_iterator i = sessions.constFind(dc);
Sessions::const_iterator i = sessions.constFind(dcWithShift);
if (i != sessions.cend()) return *i;
MTProtoSessionPtr result(new MTProtoSession());
result->start(dc);
result->start(dcWithShift);
sessions.insert(dc, result);
sessions.insert(dcWithShift, result);
return result;
}
void registerRequest(mtpRequestId requestId, int32 dc) {
void registerRequest(mtpRequestId requestId, int32 dcWithShift) {
{
QMutexLocker locker(&requestByDCLock);
requestsByDC.insert(requestId, dc);
requestsByDC.insert(requestId, dcWithShift);
}
_mtp_internal::performDelayedClear(); // need to do it somewhere..
}
@@ -530,12 +544,12 @@ namespace _mtp_internal {
if (globalHandler.onDone) (*globalHandler.onDone)(0, from, end); // some updates were received
}
void onStateChange(int32 dc, int32 state) {
if (stateChangedHandler) stateChangedHandler(dc, state);
void onStateChange(int32 dcWithShift, int32 state) {
if (stateChangedHandler) stateChangedHandler(dcWithShift, state);
}
void onSessionReset(int32 dc) {
if (sessionResetHandler) sessionResetHandler(dc);
void onSessionReset(int32 dcWithShift) {
if (sessionResetHandler) sessionResetHandler(dcWithShift);
}
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err) { // return true if need to clean request data
@@ -561,12 +575,12 @@ namespace _mtp_internal {
mtpRequestId requestId = delayedRequests.front().first;
delayedRequests.pop_front();
int32 dc = 0;
int32 dcWithShift = 0;
{
QMutexLocker locker(&requestByDCLock);
RequestsByDC::const_iterator i = requestsByDC.constFind(requestId);
if (i != requestsByDC.cend()) {
dc = i.value();
dcWithShift = i.value();
} else {
LOG(("MTP Error: could not find request dc for delayed resend, requestId %1").arg(requestId));
continue;
@@ -583,7 +597,9 @@ namespace _mtp_internal {
}
req = j.value();
}
_mtp_internal::getSession(dc < 0 ? (-dc) : dc)->sendPrepared(req);
if (MTProtoSessionPtr session = _mtp_internal::getSession(dcWithShift < 0 ? (-dcWithShift) : dcWithShift)) {
session->sendPrepared(req);
}
}
if (!delayedRequests.isEmpty()) {
@@ -595,13 +611,15 @@ namespace _mtp_internal {
namespace MTP {
void start() {
if (started()) return;
unixtimeInit();
MTProtoDCMap &dcs(mtpDCMap());
mainSession = MTProtoSessionPtr(new MTProtoSession());
mainSession->start(mtpMainDC());
sessions[mainSession->getDC()] = mainSession;
sessions[mainSession->getDcWithShift()] = mainSession;
_started = true;
resender = new _mtp_internal::RequestResender();
@@ -625,8 +643,9 @@ namespace MTP {
void restart(int32 dcMask) {
if (!_started) return;
dcMask %= _mtp_internal::dcShift;
for (Sessions::const_iterator i = sessions.cbegin(), e = sessions.cend(); i != e; ++i) {
if ((*i)->getDC() % _mtp_internal::dcShift == dcMask % _mtp_internal::dcShift) {
if (((*i)->getDcWithShift() % _mtp_internal::dcShift) == dcMask) {
(*i)->restart();
}
}
@@ -641,8 +660,9 @@ namespace MTP {
void setdc(int32 dc, bool fromZeroOnly) {
if (!dc || !_started) return;
mtpSetDC(dc, fromZeroOnly);
if (maindc() != mainSession->getDC()) {
mainSession = _mtp_internal::getSession(maindc());
int32 oldMainDc = mainSession->getDcWithShift();
if (maindc() != oldMainDc) {
killSession(oldMainDc);
}
Local::writeMtpData();
}
@@ -656,7 +676,7 @@ namespace MTP {
if (!dc) return mainSession->getState();
if (!(dc % _mtp_internal::dcShift)) {
dc += mainSession->getDC();
dc += (mainSession->getDcWithShift() % _mtp_internal::dcShift);
}
Sessions::const_iterator i = sessions.constFind(dc);
@@ -670,7 +690,7 @@ namespace MTP {
if (!dc) return mainSession->transport();
if (!(dc % _mtp_internal::dcShift)) {
dc += mainSession->getDC();
dc += (mainSession->getDcWithShift() % _mtp_internal::dcShift);
}
Sessions::const_iterator i = sessions.constFind(dc);
@@ -679,16 +699,10 @@ namespace MTP {
return QString();
}
void initdc(int32 dc) {
if (!_started) return;
_mtp_internal::getSession(dc);
}
void ping() {
MTProtoSessionPtr session = _mtp_internal::getSession(0);
if (!session) return;
return session->ping();
if (MTProtoSessionPtr session = _mtp_internal::getSession(0)) {
session->ping();
}
}
void cancel(mtpRequestId requestId) {
@@ -706,25 +720,35 @@ namespace MTP {
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
_mtp_internal::getSession(abs(i.value()))->cancel(requestId, msgId);
if (MTProtoSessionPtr session = _mtp_internal::getSession(abs(i.value()))) {
session->cancel(requestId, msgId);
}
requestsByDC.erase(i);
}
}
_mtp_internal::clearCallbacks(requestId);
}
void killSessionsDelayed() {
if (!sessionsToKill.isEmpty()) {
sessionsToKill.clear();
}
}
void killSession(int32 dc) {
Sessions::iterator i = sessions.find(dc);
if (i != sessions.end()) {
bool wasMain = (i.value() == mainSession);
i.value()->stop();
i.value()->kill();
if (sessionsToKill.isEmpty()) QTimer::singleShot(0, killSessionsDelayed);
sessionsToKill.push_back(i.value());
sessions.erase(i);
if (wasMain) {
mainSession = MTProtoSessionPtr(new MTProtoSession());
mainSession->start(mtpMainDC());
sessions[mainSession->getDC()] = mainSession;
sessions[mainSession->getDcWithShift()] = mainSession;
}
}
}
@@ -743,22 +767,30 @@ namespace MTP {
QMutexLocker locker(&requestByDCLock);
RequestsByDC::iterator i = requestsByDC.find(requestId);
if (i != requestsByDC.end()) {
return _mtp_internal::getSession(abs(i.value()))->requestState(requestId);
if (MTProtoSessionPtr session = _mtp_internal::getSession(abs(i.value()))) {
return session->requestState(requestId);
}
return MTP::RequestConnecting;
}
return MTP::RequestSent;
}
return _mtp_internal::getSession(-requestId)->requestState(0);
if (MTProtoSessionPtr session = _mtp_internal::getSession(-requestId)) {
return session->requestState(0);
}
return MTP::RequestConnecting;
}
void stop() {
for (Sessions::iterator i = sessions.begin(), e = sessions.end(); i != e; ++i) {
i.value()->stop();
i.value()->kill();
}
sessions.clear();
mainSession = MTProtoSessionPtr();
delete resender;
resender = 0;
mtpDestroyConfigLoader();
_started = false;
}
void authed(int32 uid) {
@@ -810,4 +842,8 @@ namespace MTP {
return mtpSetKey(dc, key);
}
QReadWriteLock *dcOptionsMutex() {
return mtpDcOptionsMutex();
}
};

View File

@@ -37,8 +37,8 @@ namespace _mtp_internal {
void execCallback(mtpRequestId requestId, const mtpPrime *from, const mtpPrime *end);
bool hasCallbacks(mtpRequestId requestId);
void globalCallback(const mtpPrime *from, const mtpPrime *end);
void onStateChange(int32 dc, int32 state);
void onSessionReset(int32 dc);
void onStateChange(int32 dcWithShift, int32 state);
void onSessionReset(int32 dcWithShift);
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err); // return true if need to clean request data
inline bool rpcErrorOccured(mtpRequestId requestId, const RPCResponseHandler &handler, const RPCError &err) {
return rpcErrorOccured(requestId, handler.onFail, err);
@@ -64,6 +64,7 @@ namespace _mtp_internal {
namespace MTP {
static const uint32 cfg = 1 * _mtp_internal::dcShift; // send(MTPhelp_GetConfig(), MTP::cfg + dc) - for dc enum
static const uint32 lgt = 2 * _mtp_internal::dcShift; // send(MTPauth_LogOut(), MTP::lgt + dc) - for logout of guest dcs enum
static const uint32 dld[MTPDownloadSessionsCount] = { // send(req, callbacks, MTP::dld[i] + dc) - for download
0x10 * _mtp_internal::dcShift,
0x11 * _mtp_internal::dcShift,
@@ -89,13 +90,13 @@ namespace MTP {
int32 dcstate(int32 dc = 0);
QString dctransport(int32 dc = 0);
void initdc(int32 dc);
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCResponseHandler callbacks = RPCResponseHandler(), int32 dc = 0, uint64 msCanWait = 0, mtpRequestId after = 0) {
MTProtoSessionPtr session = _mtp_internal::getSession(dc);
if (!session) return 0;
return session->send(request, callbacks, msCanWait, true, !dc, after);
if (MTProtoSessionPtr session = _mtp_internal::getSession(dc)) {
return session->send(request, callbacks, msCanWait, true, !dc, after);
}
return 0;
}
template <typename TRequest>
inline mtpRequestId send(const TRequest &request, RPCDoneHandlerPtr onDone, RPCFailHandlerPtr onFail = RPCFailHandlerPtr(), int32 dc = 0, uint64 msCanWait = 0, mtpRequestId after = 0) {
@@ -139,6 +140,8 @@ namespace MTP {
mtpKeysMap getKeys();
void setKey(int32 dc, mtpAuthKeyPtr key);
QReadWriteLock *dcOptionsMutex();
};
#include "mtproto/mtpSessionImpl.h"

View File

@@ -1114,6 +1114,7 @@ MTProtoConnectionPrivate::MTProtoConnectionPrivate(QThread *thread, MTProtoConne
// createConn();
if (!dc) {
QReadLocker lock(mtpDcOptionsMutex());
const mtpDcOptions &options(cDcOptions());
if (options.isEmpty()) {
LOG(("MTP Error: connect failed, no DCs"));
@@ -1713,6 +1714,8 @@ void MTProtoConnectionPrivate::retryByTimer() {
}
if (keyId == mtpAuthKey::RecreateKeyId) {
if (sessionData->getKey()) {
unlockKey();
QWriteLocker lock(sessionData->keyMutex());
sessionData->owner()->destroyKey();
}
@@ -1738,31 +1741,34 @@ void MTProtoConnectionPrivate::socketStart(bool afterConfig) {
_pingId = _pingMsgId = _pingIdToSend = _pingSendAt = 0;
_pingSender.stop();
const mtpDcOption *dcOption = 0;
const mtpDcOptions &options(cDcOptions());
mtpDcOptions::const_iterator dcIndex = options.constFind(dc % _mtp_internal::dcShift);
DEBUG_LOG(("MTP Info: connecting to DC %1..").arg(dc));
if (dcIndex == options.cend()) {
std::string ip;
uint32 port = 0;
{
QReadLocker lock(mtpDcOptionsMutex());
const mtpDcOptions &options(cDcOptions());
mtpDcOptions::const_iterator dcIndex = options.constFind(dc % _mtp_internal::dcShift);
DEBUG_LOG(("MTP Info: connecting to DC %1..").arg(dc));
if (dcIndex != options.cend()) {
ip = dcIndex->ip;
port = dcIndex->port;
}
}
if (!port || ip.empty()) {
if (afterConfig) {
LOG(("MTP Error: DC %1 options not found right after config load!").arg(dc));
return restart();
} else {
DEBUG_LOG(("MTP Info: DC %1 options not found, waiting for config").arg(dc));
connect(mtpConfigLoader(), SIGNAL(loaded()), this, SLOT(onConfigLoaded()));
mtpConfigLoader()->load();
return;
}
DEBUG_LOG(("MTP Info: DC %1 options not found, waiting for config").arg(dc));
connect(mtpConfigLoader(), SIGNAL(loaded()), this, SLOT(onConfigLoaded()));
mtpConfigLoader()->load();
return;
}
dcOption = &dcIndex.value();
const char *ip(dcOption->ip.c_str());
uint32 port(dcOption->port);
DEBUG_LOG(("MTP Info: socket connection to %1:%2..").arg(ip).arg(port));
DEBUG_LOG(("MTP Info: socket connection to %1:%2..").arg(ip.c_str()).arg(port));
connect(conn, SIGNAL(connected()), this, SLOT(onConnected()));
connect(conn, SIGNAL(disconnected()), this, SLOT(restart()));
conn->connectToServer(ip, port);
conn->connectToServer(ip.c_str(), port);
}
void MTProtoConnectionPrivate::restart(bool maybeBadKey) {
@@ -2824,12 +2830,36 @@ void MTProtoConnectionPrivate::onConnected() {
TCP_LOG(("Connection Info: connection succeed."));
if (updateAuthKey()) {
DEBUG_LOG(("MTP Info: returning from socketConnected.."));
return;
updateAuthKey();
}
void MTProtoConnectionPrivate::updateAuthKey() {
QReadLocker lockFinished(&sessionDataMutex);
if (!sessionData || !conn) return;
DEBUG_LOG(("AuthKey Info: MTProtoConnection updating key from MTProtoSession, dc %1").arg(dc));
uint64 newKeyId = 0;
{
ReadLockerAttempt lock(sessionData->keyMutex());
if (!lock) {
DEBUG_LOG(("MTP Info: could not lock auth_key for read, waiting signal emit"));
clearMessages();
keyId = newKeyId;
return; // some other connection is getting key
}
const mtpAuthKeyPtr &key(sessionData->getKey());
newKeyId = key ? key->keyId() : 0;
}
if (keyId != newKeyId) {
clearMessages();
keyId = newKeyId;
}
DEBUG_LOG(("AuthKey Info: MTProtoConnection update key from MTProtoSession, dc %1 result: %2").arg(dc).arg(mb(&keyId, sizeof(keyId)).str()));
if (keyId) {
return authKeyCreated();
}
DEBUG_LOG(("MTP Info: will be creating auth_key"));
DEBUG_LOG(("AuthKey Info: No key in updateAuthKey(), will be creating auth_key"));
lockKey();
const mtpAuthKeyPtr &key(sessionData->getKey());
@@ -2854,36 +2884,6 @@ void MTProtoConnectionPrivate::onConnected() {
sendRequestNotSecure(req_pq);
}
bool MTProtoConnectionPrivate::updateAuthKey() {
QReadLocker lockFinished(&sessionDataMutex);
if (!sessionData || !conn) return false;
DEBUG_LOG(("AuthKey Info: MTProtoConnection updating key from MTProtoSession, dc %1").arg(dc));
uint64 newKeyId = 0;
{
ReadLockerAttempt lock(sessionData->keyMutex());
if (!lock) {
DEBUG_LOG(("MTP Info: could not lock auth_key for read, waiting signal emit"));
clearMessages();
keyId = newKeyId;
return true; // some other connection is getting key
}
const mtpAuthKeyPtr &key(sessionData->getKey());
newKeyId = key ? key->keyId() : 0;
}
if (keyId != newKeyId) {
clearMessages();
keyId = newKeyId;
}
DEBUG_LOG(("AuthKey Info: MTProtoConnection update key from MTProtoSession, dc %1 result: %2").arg(dc).arg(mb(&keyId, sizeof(keyId)).str()));
if (keyId) {
authKeyCreated();
return true;
}
DEBUG_LOG(("AuthKey Info: Key update failed"));
return false;
}
void MTProtoConnectionPrivate::clearMessages() {
if (keyId && keyId != mtpAuthKey::RecreateKeyId && conn) {
conn->received().clear();
@@ -3481,7 +3481,14 @@ MTProtoConnectionPrivate::~MTProtoConnectionPrivate() {
void MTProtoConnectionPrivate::stop() {
QWriteLocker lockFinished(&sessionDataMutex);
sessionData = 0;
if (sessionData) {
if (myKeyLock) {
sessionData->owner()->notifyKeyCreated(mtpAuthKeyPtr()); // release key lock, let someone else create it
sessionData->keyMutex()->unlock();
myKeyLock = false;
}
sessionData = 0;
}
}
MTProtoConnection::~MTProtoConnection() {

View File

@@ -366,7 +366,7 @@ public slots:
// Sessions signals, when we need to send something
void tryToSend();
bool updateAuthKey();
void updateAuthKey();
void onConfigLoaded();

View File

@@ -56,6 +56,20 @@ int32 mtpMainDC() {
return mainDC;
}
namespace {
QMap<int32, mtpRequestId> logoutGuestMap; // dcWithShift to logout request id
bool logoutDone(mtpRequestId req) {
for (QMap<int32, mtpRequestId>::iterator i = logoutGuestMap.begin(); i != logoutGuestMap.end(); ++i) {
if (i.value() == req) {
MTP::killSession(i.key());
logoutGuestMap.erase(i);
return true;
}
}
return false;
}
}
void mtpLogoutOtherDCs() {
QList<int32> dcs;
{
@@ -64,7 +78,7 @@ void mtpLogoutOtherDCs() {
}
for (int32 i = 0, cnt = dcs.size(); i != cnt; ++i) {
if (dcs[i] != MTP::maindc()) {
MTP::send(MTPauth_LogOut(), RPCResponseHandler(), dcs[i]);
logoutGuestMap.insert(MTP::lgt + dcs[i], MTP::send(MTPauth_LogOut(), rpcDone(&logoutDone), rpcFail(&logoutDone), MTP::lgt + dcs[i]));
}
}
}
@@ -154,7 +168,11 @@ namespace {
void mtpUpdateDcOptions(const QVector<MTPDcOption> &options) {
QSet<int32> already, restart;
{
mtpDcOptions opts(cDcOptions());
mtpDcOptions opts;
{
QReadLocker lock(mtpDcOptionsMutex());
opts = cDcOptions();
}
for (QVector<MTPDcOption>::const_iterator i = options.cbegin(), e = options.cend(); i != e; ++i) {
const MTPDdcOption &optData(i->c_dcOption());
if (already.constFind(optData.vid.v) == already.cend()) {
@@ -168,16 +186,26 @@ void mtpUpdateDcOptions(const QVector<MTPDcOption> &options) {
opts.insert(optData.vid.v, mtpDcOption(optData.vid.v, optData.vhostname.c_string().v, optData.vip_address.c_string().v, optData.vport.v));
}
}
cSetDcOptions(opts);
{
QWriteLocker lock(mtpDcOptionsMutex());
cSetDcOptions(opts);
}
}
for (QSet<int32>::const_iterator i = restart.cbegin(), e = restart.cend(); i != e; ++i) {
MTP::restart(*i);
}
}
namespace {
QReadWriteLock _dcOptionsMutex;
}
QReadWriteLock *mtpDcOptionsMutex() {
return &_dcOptionsMutex;
}
MTProtoConfigLoader::MTProtoConfigLoader() : _enumCurrent(0), _enumRequest(0) {
connect(&_enumDCTimer, SIGNAL(timeout()), this, SLOT(enumDC()));
connect(this, SIGNAL(killCurrentSession(qint32,qint32)), this, SLOT(onKillCurrentSession(qint32,qint32)), Qt::QueuedConnection);
}
void MTProtoConfigLoader::load() {
@@ -189,23 +217,15 @@ void MTProtoConfigLoader::load() {
_enumDCTimer.start(MTPEnumDCTimeout);
}
void MTProtoConfigLoader::onKillCurrentSession(qint32 request, qint32 current) {
if (request == _enumRequest && current == _enumCurrent) {
if (_enumRequest) {
MTP::cancel(_enumRequest);
_enumRequest = 0;
}
if (_enumCurrent) {
MTP::killSession(MTP::cfg + _enumCurrent);
_enumCurrent = 0;
}
}
}
void MTProtoConfigLoader::done() {
_enumDCTimer.stop();
if (_enumRequest || _enumCurrent) {
emit killCurrentSession(_enumRequest, _enumCurrent);
if (_enumRequest) {
MTP::cancel(_enumRequest);
_enumRequest = 0;
}
if (_enumCurrent) {
MTP::killSession(MTP::cfg + _enumCurrent);
_enumCurrent = 0;
}
emit loaded();
}
@@ -220,14 +240,16 @@ void MTProtoConfigLoader::enumDC() {
} else {
MTP::killSession(MTP::cfg + _enumCurrent);
}
const mtpDcOptions &options(cDcOptions());
for (mtpDcOptions::const_iterator i = options.cbegin(), e = options.cend(); i != e; ++i) {
if (i.key() == _enumCurrent) {
_enumCurrent = (++i == e) ? options.cbegin().key() : i.key();
break;
{
QReadLocker lock(mtpDcOptionsMutex());
const mtpDcOptions &options(cDcOptions());
for (mtpDcOptions::const_iterator i = options.cbegin(), e = options.cend(); i != e; ++i) {
if (i.key() == _enumCurrent) {
_enumCurrent = (++i == e) ? options.cbegin().key() : i.key();
break;
}
}
}
_enumRequest = MTP::send(MTPhelp_GetConfig(), rpcDone(configLoaded), rpcFail(configFailed), MTP::cfg + _enumCurrent);
_enumDCTimer.start(MTPEnumDCTimeout);

View File

@@ -72,12 +72,10 @@ public:
public slots:
void enumDC();
void onKillCurrentSession(qint32 request, qint32 session);
signals:
void loaded();
void killCurrentSession(qint32 request, qint32 session);
private:
@@ -104,3 +102,4 @@ mtpKeysMap mtpGetKeys();
void mtpSetKey(int32 dc, mtpAuthKeyPtr key);
void mtpUpdateDcOptions(const QVector<MTPDcOption> &options);
QReadWriteLock *mtpDcOptionsMutex();

View File

@@ -63,11 +63,15 @@ void MTPSessionData::clear() {
}
MTProtoSession::MTProtoSession() : data(this), dcId(0), dc(0), msSendCall(0), msWait(0), _ping(false) {
MTProtoSession::MTProtoSession() : _killed(false), data(this), dcWithShift(0), dc(0), msSendCall(0), msWait(0), _ping(false) {
}
void MTProtoSession::start(int32 dcenter) {
if (dcId) {
if (_killed) {
DEBUG_LOG(("Session Error: can't start a killed session"));
return;
}
if (dcWithShift) {
DEBUG_LOG(("Session Info: MTProtoSession::start called on already started session"));
return;
}
@@ -84,8 +88,8 @@ void MTProtoSession::start(int32 dcenter) {
connections.reserve(cConnectionsInSession());
for (uint32 i = 0; i < cConnectionsInSession(); ++i) {
connections.push_back(new MTProtoConnection());
dcId = connections.back()->start(&data, dcenter);
if (!dcId) {
dcWithShift = connections.back()->start(&data, dcenter);
if (!dcWithShift) {
for (MTProtoConnections::const_iterator j = connections.cbegin(), e = connections.cend(); j != e; ++j) {
delete *j;
}
@@ -94,11 +98,12 @@ void MTProtoSession::start(int32 dcenter) {
return;
}
if (!dc) {
dcenter = dcId;
MTProtoDCMap::const_iterator dcIndex = dcs.constFind(dcId % _mtp_internal::dcShift);
dcenter = dcWithShift;
int32 dcId = dcWithShift % _mtp_internal::dcShift;
MTProtoDCMap::const_iterator dcIndex = dcs.constFind(dcId);
if (dcIndex == dcs.cend()) {
dc = MTProtoDCPtr(new MTProtoDC(dcId % _mtp_internal::dcShift, mtpAuthKeyPtr()));
dcs.insert(dcId % _mtp_internal::dcShift, dc);
dc = MTProtoDCPtr(new MTProtoDC(dcId, mtpAuthKeyPtr()));
dcs.insert(dcWithShift % _mtp_internal::dcShift, dc);
} else {
dc = dcIndex.value();
}
@@ -115,18 +120,32 @@ void MTProtoSession::start(int32 dcenter) {
}
void MTProtoSession::restart() {
if (_killed) {
DEBUG_LOG(("Session Error: can't restart a killed session"));
return;
}
emit needToRestart();
}
void MTProtoSession::stop() {
DEBUG_LOG(("Session Info: stopping session %1").arg(dcId));
DEBUG_LOG(("Session Info: stopping session dcWithShift %1").arg(dcWithShift));
while (!connections.isEmpty()) {
connections.back()->stop();
connections.pop_back();
}
}
void MTProtoSession::kill() {
stop();
_killed = true;
DEBUG_LOG(("Session Info: marked session dcWithShift %1 as killed").arg(dcWithShift));
}
void MTProtoSession::sendAnything(quint64 msCanWait) {
if (_killed) {
DEBUG_LOG(("Session Error: can't send anything in a killed session"));
return;
}
uint64 ms = getms(true);
if (msSendCall) {
if (ms > msSendCall + msWait) {
@@ -141,11 +160,11 @@ void MTProtoSession::sendAnything(quint64 msCanWait) {
msWait = msCanWait;
}
if (msWait) {
DEBUG_LOG(("MTP Info: dc %1 can wait for %2ms from current %3").arg(dcId).arg(msWait).arg(msSendCall));
DEBUG_LOG(("MTP Info: dcWithShift %1 can wait for %2ms from current %3").arg(dcWithShift).arg(msWait).arg(msSendCall));
msSendCall = ms;
sender.start(msWait);
} else {
DEBUG_LOG(("MTP Info: dc %1 stopped send timer, can wait for %2ms from current %3").arg(dcId).arg(msWait).arg(msSendCall));
DEBUG_LOG(("MTP Info: dcWithShift %1 stopped send timer, can wait for %2ms from current %3").arg(dcWithShift).arg(msWait).arg(msSendCall));
sender.stop();
msSendCall = 0;
needToResumeAndSend();
@@ -153,20 +172,24 @@ void MTProtoSession::sendAnything(quint64 msCanWait) {
}
void MTProtoSession::needToResumeAndSend() {
if (_killed) {
DEBUG_LOG(("Session Info: can't resume a killed session"));
return;
}
if (connections.isEmpty()) {
DEBUG_LOG(("Session Info: resuming session %1").arg(dcId));
DEBUG_LOG(("Session Info: resuming session dcWithShift %1").arg(dcWithShift));
MTProtoDCMap &dcs(mtpDCMap());
connections.reserve(cConnectionsInSession());
for (uint32 i = 0; i < cConnectionsInSession(); ++i) {
connections.push_back(new MTProtoConnection());
if (!connections.back()->start(&data, dcId)) {
if (!connections.back()->start(&data, dcWithShift)) {
for (MTProtoConnections::const_iterator j = connections.cbegin(), e = connections.cend(); j != e; ++j) {
delete *j;
}
connections.clear();
DEBUG_LOG(("Session Info: could not start connection %1 to dc %2").arg(i).arg(dcId));
dcId = 0;
DEBUG_LOG(("Session Info: could not start connection %1 to dcWithShift %2").arg(i).arg(dcWithShift));
dcWithShift = 0;
return;
}
}
@@ -259,11 +282,11 @@ void MTProtoSession::checkRequestsByTimer() {
}
void MTProtoSession::onConnectionStateChange(qint32 newState) {
_mtp_internal::onStateChange(dcId, newState);
_mtp_internal::onStateChange(dcWithShift, newState);
}
void MTProtoSession::onResetDone() {
_mtp_internal::onSessionReset(dcId);
_mtp_internal::onSessionReset(dcWithShift);
}
void MTProtoSession::cancel(mtpRequestId requestId, mtpMsgId msgId) {
@@ -428,23 +451,23 @@ QReadWriteLock *MTProtoSession::keyMutex() const {
}
void MTProtoSession::authKeyCreatedForDC() {
DEBUG_LOG(("AuthKey Info: MTProtoSession::authKeyCreatedForDC slot, emitting authKeyCreated(), dc %1").arg(dcId));
DEBUG_LOG(("AuthKey Info: MTProtoSession::authKeyCreatedForDC slot, emitting authKeyCreated(), dcWithShift %1").arg(dcWithShift));
data.setKey(dc->getKey());
emit authKeyCreated();
}
void MTProtoSession::notifyKeyCreated(const mtpAuthKeyPtr &key) {
DEBUG_LOG(("AuthKey Info: MTProtoSession::keyCreated(), setting, dc %1").arg(dcId));
DEBUG_LOG(("AuthKey Info: MTProtoSession::keyCreated(), setting, dcWithShift %1").arg(dcWithShift));
dc->setKey(key);
}
void MTProtoSession::layerWasInitedForDC(bool wasInited) {
DEBUG_LOG(("MTP Info: MTProtoSession::layerWasInitedForDC slot, dc %1").arg(dcId));
DEBUG_LOG(("MTP Info: MTProtoSession::layerWasInitedForDC slot, dcWithShift %1").arg(dcWithShift));
data.setLayerWasInited(wasInited);
}
void MTProtoSession::notifyLayerInited(bool wasInited) {
DEBUG_LOG(("MTP Info: emitting MTProtoDC::layerWasInited(%1), dc %2").arg(logBool(wasInited)).arg(dcId));
DEBUG_LOG(("MTP Info: emitting MTProtoDC::layerWasInited(%1), dcWithShift %2").arg(logBool(wasInited)).arg(dcWithShift));
dc->setConnectionInited(wasInited);
emit dc->layerWasInited(wasInited);
}
@@ -453,7 +476,7 @@ void MTProtoSession::destroyKey() {
if (!dc) return;
if (data.getKey()) {
DEBUG_LOG(("MTP Info: destroying auth_key for dc %1").arg(dcId));
DEBUG_LOG(("MTP Info: destroying auth_key for dcWithShift %1").arg(dcWithShift));
if (data.getKey() == dc->getKey()) {
dc->destroyKey();
}
@@ -461,8 +484,8 @@ void MTProtoSession::destroyKey() {
}
}
int32 MTProtoSession::getDC() const {
return dcId;
int32 MTProtoSession::getDcWithShift() const {
return dcWithShift;
}
void MTProtoSession::tryToReceive() {
@@ -481,7 +504,7 @@ void MTProtoSession::tryToReceive() {
responses.erase(i);
}
if (requestId <= 0) {
if (dcId < int(_mtp_internal::dcShift)) { // call globalCallback only in main session
if (dcWithShift < int(_mtp_internal::dcShift)) { // call globalCallback only in main session
_mtp_internal::globalCallback(response.constData(), response.constData() + response.size());
}
} else {

View File

@@ -224,8 +224,9 @@ public:
void start(int32 dcenter = 0);
void restart();
void stop();
void kill();
int32 getDC() const;
int32 getDcWithShift() const;
~MTProtoSession();
QReadWriteLock *keyMutex() const;
@@ -275,10 +276,12 @@ private:
typedef QList<MTProtoConnection*> MTProtoConnections;
MTProtoConnections connections;
bool _killed;
MTPSessionData data;
int32 dcId;
int32 dcWithShift;
MTProtoDCPtr dc;
uint64 msSendCall, msWait;

View File

@@ -37,6 +37,6 @@ mtpRequestId MTProtoSession::send(const TRequest &request, RPCResponseHandler ca
requestId = 0;
_mtp_internal::rpcErrorOccured(requestId, callbacks, rpcClientError("NO_REQUEST_ID", QString("send() failed to queue request, exception: %1").arg(e.what())));
}
if (requestId) _mtp_internal::registerRequest(requestId, toMainDC ? -getDC() : getDC());
if (requestId) _mtp_internal::registerRequest(requestId, toMainDC ? -getDcWithShift() : getDcWithShift());
return requestId;
}