2
0
mirror of https://github.com/telegramdesktop/tdesktop synced 2025-09-02 15:35:51 +00:00

Use split ranges to export all messages.

This commit is contained in:
John Preston
2018-06-21 15:01:27 +01:00
parent 36fb6dac89
commit d056c00c67
8 changed files with 231 additions and 97 deletions

View File

@@ -15,6 +15,7 @@ https://github.com/telegramdesktop/tdesktop/blob/master/LEGAL
#include "base/value_ordering.h"
#include "base/bytes.h"
#include <set>
#include <deque>
namespace Export {
@@ -107,10 +108,12 @@ struct ApiWrap::StartProcess {
enum class Step {
UserpicsCount,
SplitRanges,
DialogsCount,
LeftChannelsCount,
};
std::deque<Step> steps;
int splitIndex = 0;
StartInfo info;
};
@@ -160,23 +163,23 @@ struct ApiWrap::FileProgress {
int total = 0;
};
struct ApiWrap::LeftChannelsProcess {
struct ApiWrap::ChatsProcess {
Fn<bool(int count)> progress;
FnMut<void(Data::DialogsInfo&&)> done;
Data::DialogsInfo info;
int processedCount = 0;
std::map<Data::PeerId, int> indexByPeer;
};
struct ApiWrap::LeftChannelsProcess : ChatsProcess {
int fullCount = 0;
int offset = 0;
bool finished = false;
};
struct ApiWrap::DialogsProcess {
Fn<bool(int count)> progress;
FnMut<void(Data::DialogsInfo&&)> done;
Data::DialogsInfo info;
struct ApiWrap::DialogsProcess : ChatsProcess {
int splitIndexPlusOne = 0;
Data::TimeId offsetDate = 0;
int32 offsetId = 0;
MTPInputPeer offsetPeer = MTP_inputPeerEmpty();
@@ -190,7 +193,8 @@ struct ApiWrap::ChatProcess {
Fn<bool(Data::MessagesSlice&&)> handleSlice;
FnMut<void()> done;
int32 offsetId = 1;
int localSplitIndex = 0;
int32 largestIdPlusOne = 1;
Data::ParseMediaContext context;
base::optional<Data::MessagesSlice> slice;
@@ -234,12 +238,24 @@ auto ApiWrap::mainRequest(Request &&request) {
return std::move(_mtp.request(MTPInvokeWithTakeout<Request>(
MTP_long(*_takeoutId),
request
std::forward<Request>(request)
)).fail([=](RPCError &&result) {
error(std::move(result));
}).toDC(MTP::ShiftDcId(0, MTP::kExportDcShift)));
}
template <typename Request>
auto ApiWrap::splitRequest(int index, Request &&request) {
Expects(index < _splits.size());
//if (index == _splits.size() - 1) {
// return mainRequest(std::forward<Request>(request));
//}
return mainRequest(MTPInvokeWithMessagesRange<Request>(
_splits[index],
std::forward<Request>(request)));
}
auto ApiWrap::fileRequest(const Data::FileLocation &location, int offset) {
Expects(location.dcId != 0);
Expects(_takeoutId.has_value());
@@ -284,6 +300,9 @@ void ApiWrap::startExport(
if (_settings->types & Settings::Type::Userpics) {
_startProcess->steps.push_back(Step::UserpicsCount);
}
if (_settings->types & Settings::Type::NonChannelChatsMask) {
_startProcess->steps.push_back(Step::SplitRanges);
}
if (_settings->types & Settings::Type::AnyChatsMask) {
_startProcess->steps.push_back(Step::DialogsCount);
}
@@ -303,14 +322,17 @@ void ApiWrap::sendNextStartRequest() {
finishStartProcess();
return;
}
using Step = StartProcess::Step;
const auto step = steps.front();
steps.pop_front();
switch (step) {
case StartProcess::Step::UserpicsCount:
case Step::UserpicsCount:
return requestUserpicsCount();
case StartProcess::Step::DialogsCount:
case Step::SplitRanges:
return requestSplitRanges();
case Step::DialogsCount:
return requestDialogsCount();
case StartProcess::Step::LeftChannelsCount:
case Step::LeftChannelsCount:
return requestLeftChannelsCount();
}
Unexpected("Step in ApiWrap::sendNextStartRequest.");
@@ -339,10 +361,20 @@ void ApiWrap::requestUserpicsCount() {
}).send();
}
void ApiWrap::requestSplitRanges() {
Expects(_startProcess != nullptr);
mainRequest(MTPmessages_GetSplitRanges(
)).done([=](const MTPVector<MTPMessageRange> &result) {
_splits = result.v;
sendNextStartRequest();
}).send();
}
void ApiWrap::requestDialogsCount() {
Expects(_startProcess != nullptr);
mainRequest(MTPmessages_GetDialogs(
splitRequest(_startProcess->splitIndex, MTPmessages_GetDialogs(
MTP_flags(0),
MTP_int(0), // offset_date
MTP_int(0), // offset_id
@@ -352,14 +384,18 @@ void ApiWrap::requestDialogsCount() {
Expects(_settings != nullptr);
Expects(_startProcess != nullptr);
_startProcess->info.dialogsCount = result.match(
_startProcess->info.dialogsCount += result.match(
[](const MTPDmessages_dialogs &data) {
return int(data.vdialogs.v.size());
}, [](const MTPDmessages_dialogsSlice &data) {
return data.vcount.v;
});
sendNextStartRequest();
if (++_startProcess->splitIndex >= _splits.size()) {
sendNextStartRequest();
} else {
requestDialogsCount();
}
}).send();
}
@@ -390,6 +426,8 @@ void ApiWrap::requestLeftChannelsList(
FnMut<void(Data::DialogsInfo&&)> done) {
Expects(_leftChannelsProcess != nullptr);
validateSplits();
_leftChannelsProcess->progress = std::move(progress);
_leftChannelsProcess->done = std::move(done);
requestLeftChannelsSlice();
@@ -414,13 +452,24 @@ void ApiWrap::requestDialogsList(
FnMut<void(Data::DialogsInfo&&)> done) {
Expects(_dialogsProcess == nullptr);
validateSplits();
_dialogsProcess = std::make_unique<DialogsProcess>();
_dialogsProcess->splitIndexPlusOne = _splits.size();
_dialogsProcess->progress = std::move(progress);
_dialogsProcess->done = std::move(done);
requestDialogsSlice();
}
void ApiWrap::validateSplits() {
if (_splits.empty()) {
_splits.push_back(MTP_messageRange(
MTP_int(0),
MTP_int(std::numeric_limits<int>::max())));
}
}
void ApiWrap::startMainSession(FnMut<void()> done) {
const auto sizeLimit = _settings->media.sizeLimit;
const auto hasFiles = (_settings->media.types != 0) && (sizeLimit > 0);
@@ -699,11 +748,41 @@ void ApiWrap::requestMessages(
_chatProcess->handleSlice = std::move(slice);
_chatProcess->done = std::move(done);
requestMessagesSlice([=](int count) {
requestMessagesCount(0);
}
void ApiWrap::requestMessagesCount(int localSplitIndex) {
Expects(_chatProcess != nullptr);
Expects(localSplitIndex < _chatProcess->info.splits.size());
requestChatMessages(
_chatProcess->info.splits[localSplitIndex],
0, // offset_id
0, // add_offset
1, // limit
[=](const MTPmessages_Messages &result) {
Expects(_chatProcess != nullptr);
_chatProcess->info.messagesCount = count;
return _chatProcess->start(_chatProcess->info);
const auto count = result.match(
[](const MTPDmessages_messages &data) {
return data.vmessages.v.size();
}, [](const MTPDmessages_messagesSlice &data) {
return data.vcount.v;
}, [](const MTPDmessages_channelMessages &data) {
return data.vcount.v;
}, [](const MTPDmessages_messagesNotModified &data) {
return -1;
});
if (count < 0) {
error("Unexpected messagesNotModified received.");
return;
}
_chatProcess->info.messagesCountPerSplit[localSplitIndex] = count;
if (localSplitIndex + 1 < _chatProcess->info.splits.size()) {
requestMessagesCount(localSplitIndex + 1);
} else if (_chatProcess->start(_chatProcess->info)) {
requestMessagesSlice();
}
});
}
@@ -727,14 +806,15 @@ void ApiWrap::cancelExportFast() {
void ApiWrap::requestDialogsSlice() {
Expects(_dialogsProcess != nullptr);
mainRequest(MTPmessages_GetDialogs(
const auto splitIndex = _dialogsProcess->splitIndexPlusOne - 1;
splitRequest(splitIndex, MTPmessages_GetDialogs(
MTP_flags(0),
MTP_int(_dialogsProcess->offsetDate),
MTP_int(_dialogsProcess->offsetId),
_dialogsProcess->offsetPeer,
MTP_int(kChatsSliceLimit)
)).done([=](const MTPmessages_Dialogs &result) {
const auto finished = result.match(
auto finished = result.match(
[](const MTPDmessages_dialogs &data) {
return true;
}, [](const MTPDmessages_dialogsSlice &data) {
@@ -742,30 +822,40 @@ void ApiWrap::requestDialogsSlice() {
});
auto info = Data::ParseDialogsInfo(result);
if (finished || info.list.empty()) {
finishDialogsList();
} else {
const auto &last = info.list.back();
_dialogsProcess->processedCount += info.list.size();
const auto last = info.list.empty()
? Data::DialogInfo()
: info.list.back();
appendDialogsSlice(std::move(info));
if (!_dialogsProcess->progress(_dialogsProcess->processedCount)) {
return;
}
if (!finished && last.topMessageDate > 0) {
_dialogsProcess->offsetId = last.topMessageId;
_dialogsProcess->offsetDate = last.topMessageDate;
_dialogsProcess->offsetPeer = last.input;
appendDialogsSlice(std::move(info));
const auto count = _dialogsProcess->info.list.size();
if (!_dialogsProcess->progress(count)) {
return;
}
requestDialogsSlice();
} else if (--_dialogsProcess->splitIndexPlusOne > 0) {
_dialogsProcess->offsetId = 0;
_dialogsProcess->offsetDate = 0;
_dialogsProcess->offsetPeer = MTP_inputPeerEmpty();
} else {
finishDialogsList();
return;
}
requestDialogsSlice();
}).send();
}
void ApiWrap::appendDialogsSlice(Data::DialogsInfo &&info) {
Expects(_dialogsProcess != nullptr);
Expects(_dialogsProcess->splitIndexPlusOne <= _splits.size());
appendChatsSlice(_dialogsProcess->info, std::move(info));
appendChatsSlice(
*_dialogsProcess,
std::move(info),
_dialogsProcess->splitIndexPlusOne - 1);
}
void ApiWrap::finishDialogsList() {
@@ -822,13 +912,18 @@ void ApiWrap::requestLeftChannelsSliceGeneric(FnMut<void()> done) {
void ApiWrap::appendLeftChannelsSlice(Data::DialogsInfo &&info) {
Expects(_leftChannelsProcess != nullptr);
Expects(!_splits.empty());
appendChatsSlice(_leftChannelsProcess->info, std::move(info));
appendChatsSlice(
*_leftChannelsProcess,
std::move(info),
_splits.size() - 1);
}
void ApiWrap::appendChatsSlice(
Data::DialogsInfo &to,
Data::DialogsInfo &&info) {
ChatsProcess &to,
Data::DialogsInfo &&info,
int splitIndex) {
Expects(_settings != nullptr);
const auto types = _settings->types;
@@ -837,41 +932,33 @@ void ApiWrap::appendChatsSlice(
) | ranges::view::filter([&](const Data::DialogInfo &info) {
return (types & SettingsFromDialogsType(info.type)) != 0;
});
auto &list = to.list;
if (list.empty()) {
list = filtered | ranges::to_vector;
} else {
list.reserve(list.size() + info.list.size());
for (auto &info : filtered) {
auto &list = to.info.list;
list.reserve(list.size() + info.list.size());
for (auto &info : filtered) {
const auto nextIndex = list.size();
const auto [i, ok] = to.indexByPeer.emplace(info.peerId, nextIndex);
if (ok) {
list.push_back(std::move(info));
}
list[i->second].splits.push_back(splitIndex);
list[i->second].messagesCountPerSplit.push_back(0);
}
}
void ApiWrap::requestMessagesSlice(FnMut<bool(int count)> start) {
void ApiWrap::requestMessagesSlice() {
Expects(_chatProcess != nullptr);
auto handleResult = [=, start = std::move(start)](
const MTPmessages_Messages &result) mutable {
requestChatMessages(
_chatProcess->info.splits[_chatProcess->localSplitIndex],
_chatProcess->largestIdPlusOne,
-kMessagesSliceLimit,
kMessagesSliceLimit,
[=](const MTPmessages_Messages &result) {
Expects(_chatProcess != nullptr);
const auto count = result.match(
[](const MTPDmessages_messages &data) {
return data.vmessages.v.size();
}, [](const MTPDmessages_messagesSlice &data) {
return data.vcount.v;
}, [](const MTPDmessages_channelMessages &data) {
return data.vcount.v;
}, [](const MTPDmessages_messagesNotModified &data) {
return 0;
});
result.match([&](const MTPDmessages_messagesNotModified &data) {
error("Unexpected messagesNotModified received.");
}, [&](const auto &data) {
if (start && !start(count)) {
return;
}
if constexpr (MTPDmessages_messages::Is<decltype(data)>()) {
_chatProcess->lastSlice = true;
}
@@ -882,9 +969,17 @@ void ApiWrap::requestMessagesSlice(FnMut<bool(int count)> start) {
data.vchats,
_chatProcess->info.relativePath));
});
};
});
}
void ApiWrap::requestChatMessages(
int splitIndex,
int offsetId,
int addOffset,
int limit,
FnMut<void(MTPmessages_Messages&&)> done) {
if (_chatProcess->info.onlyMyMessages) {
mainRequest(MTPmessages_Search(
splitRequest(splitIndex, MTPmessages_Search(
MTP_flags(MTPmessages_Search::Flag::f_from_id),
_chatProcess->info.input,
MTP_string(""), // query
@@ -892,24 +987,24 @@ void ApiWrap::requestMessagesSlice(FnMut<bool(int count)> start) {
MTP_inputMessagesFilterEmpty(),
MTP_int(0), // min_date
MTP_int(0), // max_date
MTP_int(_chatProcess->offsetId),
MTP_int(-kMessagesSliceLimit),
MTP_int(kMessagesSliceLimit),
MTP_int(offsetId),
MTP_int(addOffset),
MTP_int(limit),
MTP_int(0), // max_id
MTP_int(0), // min_id
MTP_int(0) // hash
)).done(std::move(handleResult)).send();
)).done(std::move(done)).send();
} else {
mainRequest(MTPmessages_GetHistory(
splitRequest(splitIndex, MTPmessages_GetHistory(
_chatProcess->info.input,
MTP_int(_chatProcess->offsetId),
MTP_int(offsetId),
MTP_int(0), // offset_date
MTP_int(-kMessagesSliceLimit),
MTP_int(kMessagesSliceLimit),
MTP_int(addOffset),
MTP_int(limit),
MTP_int(0), // max_id
MTP_int(0), // min_id
MTP_int(0) // hash
)).done(std::move(handleResult)).send();
)).done(std::move(done)).send();
}
}
@@ -957,15 +1052,21 @@ void ApiWrap::finishMessagesSlice() {
auto slice = *base::take(_chatProcess->slice);
if (!slice.list.empty()) {
_chatProcess->offsetId = slice.list.back().id + 1;
_chatProcess->largestIdPlusOne = slice.list.back().id + 1;
if (!_chatProcess->handleSlice(std::move(slice))) {
return;
}
}
if (_chatProcess->lastSlice) {
finishMessages();
} else {
if (_chatProcess->lastSlice
&& (++_chatProcess->localSplitIndex
< _chatProcess->info.splits.size())) {
_chatProcess->lastSlice = false;
_chatProcess->largestIdPlusOne = 1;
}
if (!_chatProcess->lastSlice) {
requestMessagesSlice();
} else {
finishMessages();
}
}