2
0
mirror of https://github.com/telegramdesktop/tdesktop synced 2025-08-30 22:16:14 +00:00

API scheme updated to layer 66.

Support CDN file download.
This commit is contained in:
John Preston
2017-03-23 19:11:35 +03:00
parent 7dd24a30b5
commit 8d28d0691f
26 changed files with 873 additions and 418 deletions

View File

@@ -340,34 +340,13 @@ private:
};
typedef QMap<uint64, RSAPublicKey> RSAPublicKeys;
RSAPublicKeys InitRSAPublicKeys() {
DEBUG_LOG(("MTP Info: RSA public keys list creation"));
RSAPublicKeys result;
int keysCount;
const char **keys = cPublicRSAKeys(keysCount);
for (int i = 0; i < keysCount; ++i) {
RSAPublicKey key(keys[i]);
if (key.isValid()) {
result.insert(key.getFingerPrint(), key);
} else {
LOG(("MTP Error: could not read this public RSA key:"));
LOG((keys[i]));
}
}
DEBUG_LOG(("MTP Info: read %1 public RSA keys").arg(result.size()));
return result;
}
} // namespace
Connection::Connection(Instance *instance) : _instance(instance) {
}
void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) {
t_assert(thread == nullptr && data == nullptr);
Expects(thread == nullptr && data == nullptr);
thread = std::make_unique<Thread>();
auto newData = std::make_unique<ConnectionPrivate>(_instance, thread.get(), this, sessionData, shiftedDcId);
@@ -378,14 +357,14 @@ void Connection::start(SessionData *sessionData, ShiftedDcId shiftedDcId) {
}
void Connection::kill() {
t_assert(data != nullptr && thread != nullptr);
Expects(data != nullptr && thread != nullptr);
data->stop();
data = nullptr;
thread->quit();
}
void Connection::waitTillFinish() {
t_assert(data == nullptr && thread != nullptr);
Expects(data == nullptr && thread != nullptr);
DEBUG_LOG(("Waiting for connectionThread to finish"));
thread->wait();
@@ -393,19 +372,19 @@ void Connection::waitTillFinish() {
}
int32 Connection::state() const {
t_assert(data != nullptr && thread != nullptr);
Expects(data != nullptr && thread != nullptr);
return data->getState();
}
QString Connection::transport() const {
t_assert(data != nullptr && thread != nullptr);
Expects(data != nullptr && thread != nullptr);
return data->transport();
}
Connection::~Connection() {
t_assert(data == nullptr);
Expects(data == nullptr);
if (thread) {
waitTillFinish();
}
@@ -477,10 +456,10 @@ ConnectionPrivate::ConnectionPrivate(Instance *instance, QThread *thread, Connec
retryTimer.moveToThread(thread);
moveToThread(thread);
t_assert(_shiftedDcId != 0);
Expects(_shiftedDcId != 0);
connect(thread, SIGNAL(started()), this, SLOT(socketStart()));
connect(thread, SIGNAL(finished()), this, SLOT(doFinish()));
connect(thread, &QThread::started, this, [this] { connectToServer(); });
connect(thread, &QThread::finished, this, [this] { finishAndDestroy(); });
connect(this, SIGNAL(finished(internal::Connection*)), _instance, SLOT(connectionFinished(internal::Connection*)), Qt::QueuedConnection);
connect(&retryTimer, SIGNAL(timeout()), this, SLOT(retryByTimer()));
@@ -515,7 +494,11 @@ ConnectionPrivate::ConnectionPrivate(Instance *instance, QThread *thread, Connec
}
void ConnectionPrivate::onConfigLoaded() {
socketStart(true);
connectToServer(true);
}
void ConnectionPrivate::onCDNConfigLoaded() {
restart();
}
int32 ConnectionPrivate::getShiftedDcId() const {
@@ -881,12 +864,14 @@ void ConnectionPrivate::tryToSend() {
}
}
MTPInitConnection<mtpRequest> initWrapperImpl, *initWrapper = &initWrapperImpl;
MTPInitConnection<mtpRequest> initWrapper;
int32 initSize = 0, initSizeInInts = 0;
if (needsLayer) {
auto langCode = (cLang() == languageTest || cLang() == languageDefault) ? Sandbox::LangSystemISO() : str_const_toString(LanguageCodes[cLang()]);
initWrapperImpl = MTPInitConnection<mtpRequest>(MTP_int(ApiId), MTP_string(cApiDeviceModel()), MTP_string(cApiSystemVersion()), MTP_string(cApiAppVersion()), MTP_string(langCode), mtpRequest());
initSizeInInts = (initWrapper->innerLength() >> 2) + 2;
auto deviceModel = (_dcType == DcType::Cdn) ? "n/a" : cApiDeviceModel();
auto systemVersion = (_dcType == DcType::Cdn) ? "n/a" : cApiSystemVersion();
initWrapper = MTPInitConnection<mtpRequest>(MTP_int(ApiId), MTP_string(deviceModel), MTP_string(systemVersion), MTP_string(cApiAppVersion()), MTP_string(langCode), mtpRequest());
initSizeInInts = (initWrapper.innerLength() >> 2) + 2;
initSize = initSizeInInts * sizeof(mtpPrime);
}
@@ -946,7 +931,7 @@ void ConnectionPrivate::tryToSend() {
memcpy(wrappedRequest->data(), toSendRequest->constData(), 7 * sizeof(mtpPrime)); // all except length
wrappedRequest->push_back(mtpc_invokeWithLayer);
wrappedRequest->push_back(MTP::internal::CurrentLayer);
initWrapper->write(*wrappedRequest);
initWrapper.write(*wrappedRequest);
wrappedRequest->resize(wrappedRequest->size() + noWrapSize);
memcpy(wrappedRequest->data() + wrappedRequest->size() - noWrapSize, toSendRequest->constData() + 8, noWrapSize * sizeof(mtpPrime));
toSendRequest = wrappedRequest;
@@ -978,7 +963,7 @@ void ConnectionPrivate::tryToSend() {
initSerialized.reserve(initSizeInInts);
initSerialized.push_back(mtpc_invokeWithLayer);
initSerialized.push_back(MTP::internal::CurrentLayer);
initWrapper->write(initSerialized);
initWrapper.write(initSerialized);
}
toSendRequest = mtpRequestData::prepare(containerSize, containerSize + 3 * toSend.size()); // prepare container + each in invoke after
toSendRequest->push_back(mtpc_msg_container);
@@ -1082,7 +1067,7 @@ void ConnectionPrivate::retryByTimer() {
}
keyId = 0;
}
socketStart();
connectToServer();
}
void ConnectionPrivate::restartNow() {
@@ -1091,27 +1076,31 @@ void ConnectionPrivate::restartNow() {
restart();
}
void ConnectionPrivate::socketStart(bool afterConfig) {
void ConnectionPrivate::connectToServer(bool afterConfig) {
if (_finished) {
DEBUG_LOG(("MTP Error: socketStart() called for finished connection!"));
DEBUG_LOG(("MTP Error: connectToServer() called for finished connection!"));
return;
}
auto dcType = DcOptions::DcType::Regular;
auto isDownloadDc = isDownloadDcId(_shiftedDcId);
if (isDownloadDc) { // using media_only addresses only if key for this dc is already created
auto bareDc = bareDcId(_shiftedDcId);
_dcType = Messenger::Instance().dcOptions()->dcType(_shiftedDcId);
if (_dcType == DcType::MediaDownload) { // using media_only addresses only if key for this dc is already created
QReadLocker lockFinished(&sessionDataMutex);
if (!sessionData || sessionData->getKey()) {
dcType = DcOptions::DcType::MediaDownload;
if (!sessionData || !sessionData->getKey()) {
_dcType = DcType::Regular;
}
} else if (_dcType == DcType::Cdn && !_instance->isKeysDestroyer()) {
if (!Messenger::Instance().dcOptions()->hasCDNKeysForDc(bareDc)) {
requestCDNConfig();
return;
}
}
auto bareDc = bareDcId(_shiftedDcId);
using Variants = DcOptions::Variants;
auto kIPv4 = Variants::IPv4;
auto kIPv6 = Variants::IPv6;
auto kTcp = Variants::Tcp;
auto kHttp = Variants::Http;
auto variants = Messenger::Instance().dcOptions()->lookup(bareDc, dcType);
auto variants = Messenger::Instance().dcOptions()->lookup(bareDc, _dcType);
auto noIPv4 = (variants.data[kIPv4][kHttp].port == 0);
auto noIPv6 = (!Global::TryIPv6() || (variants.data[kIPv6][kHttp].port == 0));
if (noIPv4 && noIPv6) {
@@ -1128,7 +1117,7 @@ void ConnectionPrivate::socketStart(bool afterConfig) {
DEBUG_LOG(("MTP Info: DC %1 options for IPv4 over HTTP not found, waiting for config").arg(_shiftedDcId));
if (Global::TryIPv6() && noIPv6) DEBUG_LOG(("MTP Info: DC %1 options for IPv6 over HTTP not found, waiting for config").arg(_shiftedDcId));
connect(_instance, SIGNAL(configLoaded()), this, SLOT(onConfigLoaded()), Qt::UniqueConnection);
QMetaObject::invokeMethod(_instance, "configLoadRequest", Qt::QueuedConnection);
InvokeQueued(_instance, [instance = _instance] { instance->configLoadRequest(); });
return;
}
@@ -1201,18 +1190,18 @@ void ConnectionPrivate::restart() {
void ConnectionPrivate::onSentSome(uint64 size) {
if (!_waitForReceivedTimer.isActive()) {
uint64 remain = _waitForReceived;
auto remain = static_cast<uint64>(_waitForReceived);
if (!oldConnection) {
uint64 remainBySize = size * _waitForReceived / 8192; // 8kb / sec, so 512 kb give 64 sec
auto remainBySize = size * _waitForReceived / 8192; // 8kb / sec, so 512 kb give 64 sec
remain = snap(remainBySize, remain, uint64(MTPMaxReceiveDelay));
if (remain != _waitForReceived) {
DEBUG_LOG(("Checking connect for request with size %1 bytes, delay will be %2").arg(size).arg(remain));
}
}
if (isUploadDcId(_shiftedDcId)) {
remain *= MTPUploadSessionsCount;
remain *= kUploadSessionsCount;
} else if (isDownloadDcId(_shiftedDcId)) {
remain *= MTPDownloadSessionsCount;
remain *= kDownloadSessionsCount;
}
_waitForReceivedTimer.start(remain);
}
@@ -1276,7 +1265,7 @@ void ConnectionPrivate::onWaitReceivedFailed() {
if (retryTimer.isActive()) return;
DEBUG_LOG(("MTP Info: immediate restart!"));
QTimer::singleShot(0, this, SLOT(socketStart()));
InvokeQueued(this, [this] { connectToServer(); });
}
void ConnectionPrivate::onWaitConnectedFailed() {
@@ -1287,7 +1276,7 @@ void ConnectionPrivate::onWaitConnectedFailed() {
restarted = true;
DEBUG_LOG(("MTP Info: immediate restart!"));
QTimer::singleShot(0, this, SLOT(socketStart()));
InvokeQueued(this, [this] { connectToServer(); });
}
void ConnectionPrivate::onWaitIPv4Failed() {
@@ -1319,13 +1308,18 @@ void ConnectionPrivate::doDisconnect() {
restarted = false;
}
void ConnectionPrivate::doFinish() {
void ConnectionPrivate::finishAndDestroy() {
doDisconnect();
_finished = true;
emit finished(_owner);
deleteLater();
}
void ConnectionPrivate::requestCDNConfig() {
connect(_instance, SIGNAL(cdnConfigLoaded()), this, SLOT(onCDNConfigLoaded()), Qt::UniqueConnection);
InvokeQueued(_instance, [instance = _instance] { instance->cdnConfigLoadRequest(); });
}
void ConnectionPrivate::handleReceived() {
QReadLocker lockFinished(&sessionDataMutex);
if (!sessionData) return;
@@ -2046,16 +2040,20 @@ ConnectionPrivate::HandleResult ConnectionPrivate::handleOneReceived(const mtpPr
return HandleResult::ResetSession;
}
mtpBuffer update(end - from);
if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
if (_dcType == DcType::Regular) {
mtpBuffer update(end - from);
if (end > from) memcpy(update.data(), from, (end - from) * sizeof(mtpPrime));
QWriteLocker locker(sessionData->haveReceivedMutex());
mtpResponseMap &haveReceived(sessionData->haveReceivedMap());
mtpRequestId fakeRequestId = sessionData->nextFakeRequestId();
haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new updates
QWriteLocker locker(sessionData->haveReceivedMutex());
mtpResponseMap &haveReceived(sessionData->haveReceivedMap());
mtpRequestId fakeRequestId = sessionData->nextFakeRequestId();
haveReceived.insert(fakeRequestId, mtpResponse(update)); // notify main process about new updates
if (cons != mtpc_updatesTooLong && cons != mtpc_updateShortMessage && cons != mtpc_updateShortChatMessage && cons != mtpc_updateShortSentMessage && cons != mtpc_updateShort && cons != mtpc_updatesCombined && cons != mtpc_updates) {
LOG(("Message Error: unknown constructor %1").arg(cons)); // maybe new api?..
if (cons != mtpc_updatesTooLong && cons != mtpc_updateShortMessage && cons != mtpc_updateShortChatMessage && cons != mtpc_updateShortSentMessage && cons != mtpc_updateShort && cons != mtpc_updatesCombined && cons != mtpc_updates) {
LOG(("Message Error: unknown constructor %1").arg(cons)); // maybe new api?..
}
} else {
LOG(("Message Error: unexpected updates in dcType: %1").arg(static_cast<int>(_dcType)));
}
return HandleResult::Success;
@@ -2430,27 +2428,17 @@ void ConnectionPrivate::pqAnswered() {
return restart();
}
static MTP::internal::RSAPublicKeys RSAKeys = MTP::internal::InitRSAPublicKeys();
const MTP::internal::RSAPublicKey *rsaKey = nullptr;
auto &fingerPrints = res_pq.c_resPQ().vserver_public_key_fingerprints.v;
for (auto &fingerPrint : fingerPrints) {
auto it = RSAKeys.constFind(static_cast<uint64>(fingerPrint.v));
if (it != RSAKeys.cend()) {
rsaKey = &it.value();
break;
auto rsaKey = internal::RSAPublicKey();
if (!Messenger::Instance().dcOptions()->getDcRSAKey(bareDcId(_shiftedDcId), res_pq.c_resPQ().vserver_public_key_fingerprints.v, &rsaKey)) {
if (_dcType == DcType::Cdn) {
LOG(("Warning: CDN public RSA key not found"));
requestCDNConfig();
return;
}
}
if (!rsaKey) {
QStringList suggested, my;
for (auto &fingerPrint : fingerPrints) {
suggested.push_back(QString("%1").arg(fingerPrint.v));
}
for (auto i = RSAKeys.cbegin(), e = RSAKeys.cend(); i != e; ++i) {
my.push_back(QString("%1").arg(i.key()));
}
LOG(("AuthKey Error: could not choose public RSA key, suggested fingerprints: %1, my fingerprints: %2").arg(suggested.join(", ")).arg(my.join(", ")));
LOG(("AuthKey Error: could not choose public RSA key"));
return restart();
}
t_assert(rsaKey.isValid());
_authKeyData->server_nonce = res_pq_data.vserver_nonce;
_authKeyData->new_nonce = rand_value<MTPint256>();
@@ -2477,14 +2465,14 @@ void ConnectionPrivate::pqAnswered() {
MTPReq_DH_params req_DH_params;
req_DH_params.vnonce = _authKeyData->nonce;
req_DH_params.vserver_nonce = _authKeyData->server_nonce;
req_DH_params.vpublic_key_fingerprint = MTP_long(rsaKey->getFingerPrint());
req_DH_params.vpublic_key_fingerprint = MTP_long(rsaKey.getFingerPrint());
req_DH_params.vp = p_q_inner.c_p_q_inner_data().vp;
req_DH_params.vq = p_q_inner.c_p_q_inner_data().vq;
req_DH_params.vencrypted_data = MTP_string(std::move(dhEncString));
sendRequestNotSecure(req_DH_params);
}
std::string ConnectionPrivate::encryptPQInnerRSA(const MTPP_Q_inner_data &data, const MTP::internal::RSAPublicKey *key) {
std::string ConnectionPrivate::encryptPQInnerRSA(const MTPP_Q_inner_data &data, const MTP::internal::RSAPublicKey &key) {
auto p_q_inner_size = data.innerLength();
auto encSize = (p_q_inner_size >> 2) + 6;
if (encSize >= 65) {
@@ -2509,7 +2497,7 @@ std::string ConnectionPrivate::encryptPQInnerRSA(const MTPP_Q_inner_data &data,
}
auto dhEncString = std::string();
if (!key->encrypt(reinterpret_cast<const char*>(&encBuffer[0]) + 3, dhEncString)) {
if (!key.encrypt(reinterpret_cast<const char*>(&encBuffer[0]) + 3, dhEncString)) {
return std::string();
}
return dhEncString;
@@ -2872,17 +2860,7 @@ void ConnectionPrivate::onError4(qint32 errorCode) {
LOG(("Protocol Error: -429 flood code returned!"));
}
if (_conn || !_conn6) {
destroyConn();
_waitForConnectedTimer.stop();
if (errorCode == -404 && _instance->isKeysDestroyer()) {
LOG(("MTP Info: -404 error received on destroying key %1, assuming it is destroyed.").arg(_shiftedDcId));
emit _instance->keyDestroyed(_shiftedDcId);
return;
} else {
MTP_LOG(_shiftedDcId, ("Restarting after error in IPv4 connection, error code: %1...").arg(errorCode));
return restart();
}
handleError(errorCode);
} else {
destroyConn(&_conn4);
}
@@ -2895,22 +2873,32 @@ void ConnectionPrivate::onError6(qint32 errorCode) {
LOG(("Protocol Error: -429 flood code returned!"));
}
if (_conn || !_conn4) {
destroyConn();
_waitForConnectedTimer.stop();
if (errorCode == -404 && _instance->isKeysDestroyer()) {
LOG(("MTP Info: -404 error received on destroying key %1, assuming it is destroyed.").arg(_shiftedDcId));
emit _instance->keyDestroyed(_shiftedDcId);
return;
} else {
MTP_LOG(_shiftedDcId, ("Restarting after error in IPv6 connection, error code: %1...").arg(errorCode));
return restart();
}
handleError(errorCode);
} else {
destroyConn(&_conn6);
}
}
void ConnectionPrivate::handleError(int errorCode) {
destroyConn();
_waitForConnectedTimer.stop();
if (errorCode == -404) {
if (_instance->isKeysDestroyer()) {
LOG(("MTP Info: -404 error received on destroying key %1, assuming it is destroyed.").arg(_shiftedDcId));
emit _instance->keyDestroyed(_shiftedDcId);
return;
} else if (_dcType == DcType::Cdn) {
LOG(("MTP Info: -404 error received in CDN dc %1, assuming it was destroyed, recreating.").arg(_shiftedDcId));
clearMessages();
keyId = kRecreateKeyId;
return restart();
}
}
MTP_LOG(_shiftedDcId, ("Restarting after error in connection, error code: %1...").arg(errorCode));
return restart();
}
void ConnectionPrivate::onReadyData() {
}