2
0
mirror of https://github.com/telegramdesktop/tdesktop synced 2025-09-05 00:46:08 +00:00

Start data export in lib_export.

This commit is contained in:
John Preston
2018-06-02 17:29:21 +03:00
parent c2fa149ffd
commit c587c011d2
52 changed files with 1994 additions and 320 deletions

View File

@@ -76,10 +76,19 @@ public:
std::unique_ptr<internal::Connection> &&connection);
void connectionFinished(internal::Connection *connection);
void registerRequest(mtpRequestId requestId, ShiftedDcId dcWithShift);
void sendRequest(
mtpRequestId requestId,
mtpRequest &&request,
RPCResponseHandler &&callbacks,
ShiftedDcId shiftedDcId,
TimeMs msCanWait,
bool needsLayer,
mtpRequestId afterRequestId);
void registerRequest(mtpRequestId requestId, ShiftedDcId shiftedDcId);
void unregisterRequest(mtpRequestId requestId);
mtpRequestId storeRequest(
mtpRequest &request,
void storeRequest(
mtpRequestId requestId,
const mtpRequest &request,
RPCResponseHandler &&callbacks);
mtpRequest getRequest(mtpRequestId requestId);
void clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids);
@@ -87,8 +96,8 @@ public:
bool hasCallbacks(mtpRequestId requestId);
void globalCallback(const mtpPrime *from, const mtpPrime *end);
void onStateChange(ShiftedDcId dcWithShift, int32 state);
void onSessionReset(ShiftedDcId dcWithShift);
void onStateChange(ShiftedDcId shiftedDcId, int32 state);
void onSessionReset(ShiftedDcId shiftedDcId);
// return true if need to clean request data
bool rpcErrorOccured(mtpRequestId requestId, const RPCFailHandlerPtr &onFail, const RPCError &err);
@@ -102,7 +111,7 @@ public:
void setSessionResetHandler(Fn<void(ShiftedDcId shiftedDcId)> handler);
void clearGlobalHandlers();
internal::Session *getSession(ShiftedDcId shiftedDcId);
not_null<internal::Session*> getSession(ShiftedDcId shiftedDcId);
bool isNormal() const {
return (_mode == Instance::Mode::Normal);
@@ -497,9 +506,7 @@ QString Instance::Private::dctransport(ShiftedDcId shiftedDcId) {
}
void Instance::Private::ping() {
if (auto session = getSession(0)) {
session->ping();
}
getSession(0)->ping();
}
void Instance::Private::cancel(mtpRequestId requestId) {
@@ -517,27 +524,23 @@ void Instance::Private::cancel(mtpRequestId requestId) {
}
unregisterRequest(requestId);
if (shiftedDcId) {
if (const auto session = getSession(qAbs(*shiftedDcId))) {
session->cancel(requestId, msgId);
}
const auto session = getSession(qAbs(*shiftedDcId));
session->cancel(requestId, msgId);
}
clearCallbacks(requestId);
}
int32 Instance::Private::state(mtpRequestId requestId) { // < 0 means waiting for such count of ms
// result < 0 means waiting for such count of ms.
int32 Instance::Private::state(mtpRequestId requestId) {
if (requestId > 0) {
if (const auto shiftedDcId = queryRequestByDc(requestId)) {
if (auto session = getSession(qAbs(*shiftedDcId))) {
return session->requestState(requestId);
}
return MTP::RequestConnecting;
const auto session = getSession(qAbs(*shiftedDcId));
return session->requestState(requestId);
}
return MTP::RequestSent;
}
if (auto session = getSession(-requestId)) {
return session->requestState(0);
}
return MTP::RequestConnecting;
const auto session = getSession(-requestId);
return session->requestState(0);
}
void Instance::Private::killSession(ShiftedDcId shiftedDcId) {
@@ -837,9 +840,8 @@ void Instance::Private::checkDelayedRequests() {
}
request = it->second;
}
if (auto session = getSession(qAbs(dcWithShift))) {
session->sendPrepared(request);
}
const auto session = getSession(qAbs(dcWithShift));
session->sendPrepared(request);
}
if (!_delayedRequests.empty()) {
@@ -847,11 +849,38 @@ void Instance::Private::checkDelayedRequests() {
}
}
void Instance::Private::sendRequest(
mtpRequestId requestId,
mtpRequest &&request,
RPCResponseHandler &&callbacks,
ShiftedDcId shiftedDcId,
TimeMs msCanWait,
bool needsLayer,
mtpRequestId afterRequestId) {
const auto session = getSession(shiftedDcId);
request->requestId = requestId;
storeRequest(requestId, request, std::move(callbacks));
const auto toMainDc = (shiftedDcId == 0);
const auto realShiftedDcId = session->getDcWithShift();
const auto signedDcId = toMainDc ? -realShiftedDcId : realShiftedDcId;
registerRequest(requestId, signedDcId);
if (afterRequestId) {
request->after = getRequest(afterRequestId);
}
request->msDate = getms(true); // > 0 - can send without container
request->needsLayer = needsLayer;
session->sendPrepared(request, msCanWait);
}
void Instance::Private::registerRequest(
mtpRequestId requestId,
ShiftedDcId dcWithShift) {
ShiftedDcId shiftedDcId) {
QMutexLocker locker(&_requestByDcLock);
_requestsByDc.emplace(requestId, dcWithShift);
_requestsByDc.emplace(requestId, shiftedDcId);
}
void Instance::Private::unregisterRequest(mtpRequestId requestId) {
@@ -866,11 +895,10 @@ void Instance::Private::unregisterRequest(mtpRequestId requestId) {
_requestsByDc.erase(requestId);
}
mtpRequestId Instance::Private::storeRequest(
mtpRequest &request,
void Instance::Private::storeRequest(
mtpRequestId requestId,
const mtpRequest &request,
RPCResponseHandler &&callbacks) {
const auto requestId = reqid();
request->requestId = requestId;
if (callbacks.onDone || callbacks.onFail) {
QMutexLocker locker(&_parserMapLock);
_parserMap.emplace(requestId, std::move(callbacks));
@@ -879,7 +907,6 @@ mtpRequestId Instance::Private::storeRequest(
QWriteLocker locker(&_requestMapLock);
_requestMap.emplace(requestId, request);
}
return requestId;
}
mtpRequest Instance::Private::getRequest(mtpRequestId requestId) {
@@ -1075,9 +1102,8 @@ void Instance::Private::importDone(const MTPauth_Authorization &result, mtpReque
_instance->setMainDcId(newdc);
}
DEBUG_LOG(("MTP Info: resending request %1 to dc %2 after import auth").arg(waitedRequestId).arg(*shiftedDcId));
if (auto session = getSession(*shiftedDcId)) {
session->sendPrepared(it->second);
}
const auto session = getSession(*shiftedDcId);
session->sendPrepared(it->second);
}
waiters.clear();
}
@@ -1190,12 +1216,11 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e
}
request = it->second;
}
if (auto session = getSession(newdcWithShift)) {
registerRequest(
requestId,
(dcWithShift < 0) ? -newdcWithShift : newdcWithShift);
session->sendPrepared(request);
}
const auto session = getSession(newdcWithShift);
registerRequest(
requestId,
(dcWithShift < 0) ? -newdcWithShift : newdcWithShift);
session->sendPrepared(request);
return true;
} else if (code < 0 || code >= 500 || (m = QRegularExpression("^FLOOD_WAIT_(\\d+)$").match(err)).hasMatch()) {
if (!requestId) return false;
@@ -1270,10 +1295,9 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e
}
if (!dcWithShift) return false;
if (auto session = getSession(qAbs(dcWithShift))) {
request->needsLayer = true;
session->sendPrepared(request);
}
const auto session = getSession(qAbs(dcWithShift));
request->needsLayer = true;
session->sendPrepared(request);
return true;
} else if (err == qstr("CONNECTION_LANG_CODE_INVALID")) {
Lang::CurrentCloudManager().resetToDefault();
@@ -1308,10 +1332,9 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e
if (!dcWithShift) return false;
if (!request->after) {
if (auto session = getSession(qAbs(dcWithShift))) {
request->needsLayer = true;
session->sendPrepared(request);
}
const auto session = getSession(qAbs(dcWithShift));
request->needsLayer = true;
session->sendPrepared(request);
} else {
auto newdc = bareDcId(qAbs(dcWithShift));
auto &waiters(_authWaiters[newdc]);
@@ -1343,7 +1366,8 @@ bool Instance::Private::onErrorDefault(mtpRequestId requestId, const RPCError &e
return false;
}
internal::Session *Instance::Private::getSession(ShiftedDcId shiftedDcId) {
not_null<internal::Session*> Instance::Private::getSession(
ShiftedDcId shiftedDcId) {
if (!shiftedDcId) {
Assert(_mainSession != nullptr);
return _mainSession;
@@ -1600,28 +1624,12 @@ void Instance::clearGlobalHandlers() {
_private->clearGlobalHandlers();
}
void Instance::onStateChange(ShiftedDcId dcWithShift, int32 state) {
_private->onStateChange(dcWithShift, state);
void Instance::onStateChange(ShiftedDcId shiftedDcId, int32 state) {
_private->onStateChange(shiftedDcId, state);
}
void Instance::onSessionReset(ShiftedDcId dcWithShift) {
_private->onSessionReset(dcWithShift);
}
void Instance::registerRequest(
mtpRequestId requestId,
ShiftedDcId dcWithShift) {
_private->registerRequest(requestId, dcWithShift);
}
mtpRequestId Instance::storeRequest(
mtpRequest &request,
RPCResponseHandler &&callbacks) {
return _private->storeRequest(request, std::move(callbacks));
}
mtpRequest Instance::getRequest(mtpRequestId requestId) {
return _private->getRequest(requestId);
void Instance::onSessionReset(ShiftedDcId shiftedDcId) {
_private->onSessionReset(shiftedDcId);
}
void Instance::clearCallbacksDelayed(std::vector<RPCCallbackClear> &&ids) {
@@ -1655,29 +1663,27 @@ void Instance::scheduleKeyDestroy(ShiftedDcId shiftedDcId) {
void Instance::onKeyDestroyed(qint32 shiftedDcId) {
_private->completedKeyDestroy(shiftedDcId);
}
mtpRequestId Instance::send(
void Instance::sendRequest(
mtpRequestId requestId,
mtpRequest &&request,
RPCResponseHandler &&callbacks,
ShiftedDcId dcId,
ShiftedDcId shiftedDcId,
TimeMs msCanWait,
mtpRequestId after) {
if (const auto session = _private->getSession(dcId)) {
return session->send(
mtpRequestData::serialize(request),
std::move(callbacks),
msCanWait,
true,
!dcId,
after);
}
return 0;
bool needsLayer,
mtpRequestId afterRequestId) {
return _private->sendRequest(
requestId,
std::move(request),
std::move(callbacks),
shiftedDcId,
msCanWait,
needsLayer,
afterRequestId);
}
void Instance::sendAnything(ShiftedDcId dcId, TimeMs msCanWait) {
if (const auto session = _private->getSession(dcId)) {
session->sendAnything(msCanWait);
}
void Instance::sendAnything(ShiftedDcId shiftedDcId, TimeMs msCanWait) {
const auto session = _private->getSession(shiftedDcId);
session->sendAnything(msCanWait);
}
Instance::~Instance() {