2
0
mirror of https://github.com/kotatogram/kotatogram-desktop synced 2025-08-31 06:35:14 +00:00

Use vector queues in mtproto file downloader.

This commit is contained in:
John Preston
2019-12-02 10:28:33 +03:00
parent 524d64a462
commit 12f5ccaaa5
9 changed files with 252 additions and 272 deletions

View File

@@ -32,6 +32,8 @@ constexpr auto kKillSessionTimeout = 15 * crl::time(1000);
// Max 16 file parts downloaded at the same time, 128 KB each.
constexpr auto kMaxFileQueries = 16;
constexpr auto kMaxWaitedInConnection = 512 * 1024;
// Max 8 http[s] files downloaded at the same time.
constexpr auto kMaxWebFileQueries = 8;
@@ -43,17 +45,108 @@ constexpr auto kPartSize = 128 * 1024;
} // namespace
void Downloader::Queue::enqueue(not_null<FileLoader*> loader) {
const auto i = ranges::find(_loaders, loader);
if (i != end(_loaders)) {
return;
}
_loaders.push_back(loader);
_previousGeneration.erase(
ranges::remove(_previousGeneration, loader),
end(_previousGeneration));
}
void Downloader::Queue::remove(not_null<FileLoader*> loader) {
_loaders.erase(ranges::remove(_loaders, loader), end(_loaders));
_previousGeneration.erase(
ranges::remove(_previousGeneration, loader),
end(_previousGeneration));
}
void Downloader::Queue::resetGeneration() {
if (!_previousGeneration.empty()) {
_loaders.reserve(_loaders.size() + _previousGeneration.size());
std::copy(
begin(_previousGeneration),
end(_previousGeneration),
std::back_inserter(_loaders));
_previousGeneration.clear();
}
std::swap(_loaders, _previousGeneration);
}
FileLoader *Downloader::Queue::nextLoader() const {
auto &&all = ranges::view::concat(_loaders, _previousGeneration);
const auto i = ranges::find(all, true, &FileLoader::readyToRequest);
return (i != all.end()) ? i->get() : nullptr;
}
Downloader::Downloader(not_null<ApiWrap*> api)
: _api(api)
, _killDownloadSessionsTimer([=] { killDownloadSessions(); })
, _queueForWeb(kMaxWebFileQueries) {
, _killDownloadSessionsTimer([=] { killDownloadSessions(); }) {
}
void Downloader::clearPriorities() {
++_priority;
Downloader::~Downloader() {
killDownloadSessions();
}
void Downloader::requestedAmountIncrement(MTP::DcId dcId, int index, int amount) {
void Downloader::enqueue(not_null<FileLoader*> loader) {
const auto dcId = loader->dcId();
(dcId ? _mtprotoLoaders[dcId] : _webLoaders).enqueue(loader);
if (!_resettingGeneration) {
_resettingGeneration = true;
crl::on_main(this, [=] {
resetGeneration();
});
}
checkSendNext();
}
void Downloader::remove(not_null<FileLoader*> loader) {
const auto dcId = loader->dcId();
(dcId ? _mtprotoLoaders[dcId] : _webLoaders).remove(loader);
crl::on_main(&_api->session(), [=] { checkSendNext(); });
}
void Downloader::resetGeneration() {
_resettingGeneration = false;
for (auto &[dcId, queue] : _mtprotoLoaders) {
queue.resetGeneration();
}
_webLoaders.resetGeneration();
}
void Downloader::checkSendNext() {
for (auto &[dcId, queue] : _mtprotoLoaders) {
const auto bestIndex = [&] {
const auto i = _requestedBytesAmount.find(dcId);
if (i == end(_requestedBytesAmount)) {
return 0;
}
const auto j = ranges::min_element(i->second);
const auto inConnection = *j;
return (inConnection + kPartSize <= kMaxWaitedInConnection)
? (j - begin(i->second))
: -1;
}();
if (bestIndex < 0) {
continue;
}
if (const auto loader = queue.nextLoader()) {
loader->loadPart(bestIndex);
}
}
if (_requestedBytesAmount[0][0] < kMaxWebFileQueries) {
if (const auto loader = _webLoaders.nextLoader()) {
loader->loadPart(0);
}
}
}
void Downloader::requestedAmountIncrement(
MTP::DcId dcId,
int index,
int amount) {
Expects(index >= 0 && index < MTP::kDownloadSessionsCount);
using namespace rpl::mappers;
@@ -63,6 +156,9 @@ void Downloader::requestedAmountIncrement(MTP::DcId dcId, int index, int amount)
it = _requestedBytesAmount.emplace(dcId, RequestedInDc { { 0 } }).first;
}
it->second[index] += amount;
if (!dcId) {
return; // webLoaders.
}
if (amount > 0) {
killDownloadSessionsStop(dcId);
} else if (ranges::find_if(it->second, _1 > 0) == end(it->second)) {
@@ -70,6 +166,13 @@ void Downloader::requestedAmountIncrement(MTP::DcId dcId, int index, int amount)
}
}
int Downloader::chooseDcIndexForRequest(MTP::DcId dcId) {
const auto i = _requestedBytesAmount.find(dcId);
return (i != end(_requestedBytesAmount))
? (ranges::min_element(i->second) - begin(i->second))
: 0;
}
void Downloader::killDownloadSessionsStart(MTP::DcId dcId) {
if (!_killDownloadSessionTimes.contains(dcId)) {
_killDownloadSessionTimes.emplace(
@@ -110,35 +213,6 @@ void Downloader::killDownloadSessions() {
}
}
int Downloader::chooseDcIndexForRequest(MTP::DcId dcId) const {
auto result = 0;
auto it = _requestedBytesAmount.find(dcId);
if (it != _requestedBytesAmount.cend()) {
for (auto i = 1; i != MTP::kDownloadSessionsCount; ++i) {
if (it->second[i] < it->second[result]) {
result = i;
}
}
}
return result;
}
not_null<Downloader::Queue*> Downloader::queueForDc(MTP::DcId dcId) {
const auto i = _queuesForDc.find(dcId);
const auto result = (i != end(_queuesForDc))
? i
: _queuesForDc.emplace(dcId, Queue(kMaxFileQueries)).first;
return &result->second;
}
not_null<Downloader::Queue*> Downloader::queueForWeb() {
return &_queueForWeb;
}
Downloader::~Downloader() {
killDownloadSessions();
}
} // namespace Storage
namespace {
@@ -154,13 +228,15 @@ WebLoadMainManager *_webLoadMainManager = nullptr;
FileLoader::FileLoader(
const QString &toFile,
MTP::DcId dcId,
int32 size,
LocationType locationType,
LoadToCacheSetting toCache,
LoadFromCloudSetting fromCloud,
bool autoLoading,
uint8 cacheTag)
: _downloader(&Auth().downloader())
: _dcId(dcId)
, _downloader(&Auth().downloader())
, _autoLoading(autoLoading)
, _cacheTag(cacheTag)
, _filename(toFile)
@@ -172,6 +248,10 @@ FileLoader::FileLoader(
Expects(!_filename.isEmpty() || (_size <= Storage::kMaxFileInMemory));
}
FileLoader::~FileLoader() {
_downloader->remove(this);
}
Main::Session &FileLoader::session() const {
return _downloader->api().session();
}
@@ -257,46 +337,7 @@ void FileLoader::permitLoadFromCloud() {
}
void FileLoader::notifyAboutProgress() {
const auto queue = _queue;
emit progress(this);
LoadNextFromQueue(queue);
}
void FileLoader::LoadNextFromQueue(not_null<Queue*> queue) {
if (queue->queriesCount >= queue->queriesLimit) {
return;
}
for (auto i = queue->start; i;) {
if (i->loadPart()) {
if (queue->queriesCount >= queue->queriesLimit) {
return;
}
} else {
i = i->_next;
}
}
}
void FileLoader::removeFromQueue() {
if (!_inQueue) return;
if (_next) {
_next->_prev = _prev;
}
if (_prev) {
_prev->_next = _next;
}
if (_queue->end == this) {
_queue->end = _prev;
}
if (_queue->start == this) {
_queue->start = _next;
}
_next = _prev = nullptr;
_inQueue = false;
}
FileLoader::~FileLoader() {
removeFromQueue();
}
void FileLoader::localLoaded(
@@ -331,71 +372,7 @@ void FileLoader::start() {
return cancel(true);
}
}
auto currentPriority = _downloader->currentPriority();
FileLoader *before = nullptr, *after = nullptr;
if (_inQueue && _priority == currentPriority) {
if (!_next || _next->_priority < currentPriority) return startLoading();
after = _next;
while (after->_next && after->_next->_priority == currentPriority) {
after = after->_next;
}
} else {
_priority = currentPriority;
if (_inQueue) {
if (_next && _next->_priority == currentPriority) {
after = _next;
} else if (_prev && _prev->_priority < currentPriority) {
before = _prev;
while (before->_prev && before->_prev->_priority < currentPriority) {
before = before->_prev;
}
} else {
return startLoading();
}
} else {
if (_queue->start && _queue->start->_priority == currentPriority) {
after = _queue->start;
} else {
before = _queue->start;
}
}
if (after) {
while (after->_next && after->_next->_priority == currentPriority) {
after = after->_next;
}
}
}
removeFromQueue();
_inQueue = true;
if (!_queue->start) {
_queue->start = _queue->end = this;
} else if (before) {
if (before != _next) {
_prev = before->_prev;
_next = before;
_next->_prev = this;
if (_prev) {
_prev->_next = this;
}
if (_queue->start->_prev) _queue->start = _queue->start->_prev;
}
} else if (after) {
if (after != _prev) {
_next = after->_next;
_prev = after;
after->_next = this;
if (_next) {
_next->_prev = this;
}
if (_queue->end->_next) _queue->end = _queue->end->_next;
}
} else {
LOG(("Queue Error: _start && !before && !after"));
}
return startLoading();
_downloader->enqueue(this);
}
void FileLoader::loadLocal(const Storage::Cache::Key &key) {
@@ -480,9 +457,8 @@ void FileLoader::cancel(bool fail) {
_file.remove();
}
_data = QByteArray();
removeFromQueue();
const auto queue = _queue;
const auto downloader = _downloader;
const auto sessionGuard = &session();
const auto weak = QPointer<FileLoader>(this);
if (fail) {
@@ -494,16 +470,6 @@ void FileLoader::cancel(bool fail) {
_filename = QString();
_file.setFileName(_filename);
}
// Current cancel() call could be made from ~Main::Session().
crl::on_main(sessionGuard, [=] { LoadNextFromQueue(queue); });
}
void FileLoader::startLoading() {
if ((_queue->queriesCount >= _queue->queriesLimit) || _finished) {
return;
}
loadPart();
}
int FileLoader::currentOffset() const {
@@ -594,7 +560,7 @@ bool FileLoader::finalizeResult() {
Platform::File::PostprocessDownloaded(
QFileInfo(_file).absoluteFilePath());
}
removeFromQueue();
_downloader->remove(this);
if (_localStatus == LocalStatus::NotFound) {
if (const auto key = fileLocationKey()) {
@@ -627,6 +593,7 @@ mtpFileLoader::mtpFileLoader(
uint8 cacheTag)
: FileLoader(
to,
location.dcId(),
size,
type,
toCache,
@@ -635,7 +602,6 @@ mtpFileLoader::mtpFileLoader(
cacheTag)
, _location(location)
, _origin(origin) {
_queue = _downloader->queueForDc(dcId());
}
mtpFileLoader::mtpFileLoader(
@@ -646,6 +612,7 @@ mtpFileLoader::mtpFileLoader(
uint8 cacheTag)
: FileLoader(
QString(),
Global::WebFileDcId(),
size,
UnknownFileLocation,
LoadToCacheAsWell,
@@ -653,7 +620,6 @@ mtpFileLoader::mtpFileLoader(
autoLoading,
cacheTag)
, _location(location) {
_queue = _downloader->queueForDc(dcId());
}
mtpFileLoader::mtpFileLoader(
@@ -664,6 +630,7 @@ mtpFileLoader::mtpFileLoader(
uint8 cacheTag)
: FileLoader(
QString(),
Global::WebFileDcId(),
size,
UnknownFileLocation,
LoadToCacheAsWell,
@@ -671,7 +638,10 @@ mtpFileLoader::mtpFileLoader(
autoLoading,
cacheTag)
, _location(location) {
_queue = _downloader->queueForDc(dcId());
}
mtpFileLoader::~mtpFileLoader() {
cancelRequests();
}
Data::FileOrigin mtpFileLoader::fileOrigin() const {
@@ -703,31 +673,26 @@ void mtpFileLoader::refreshFileReferenceFrom(
makeRequest(offset);
}
bool mtpFileLoader::loadPart() {
if (_finished || _lastComplete || (!_sentRequests.empty() && !_size)) {
return false;
} else if (_size && _nextRequestOffset >= _size) {
return false;
}
bool mtpFileLoader::readyToRequest() const {
return !_finished
&& !_lastComplete
&& (_sentRequests.empty() || _size != 0)
&& (!_size || _nextRequestOffset < _size);
}
makeRequest(_nextRequestOffset);
void mtpFileLoader::loadPart(int dcIndex) {
Expects(readyToRequest());
makeRequest(_nextRequestOffset, dcIndex);
_nextRequestOffset += Storage::kPartSize;
return true;
}
MTP::DcId mtpFileLoader::dcId() const {
if (const auto storage = base::get_if<StorageFileLocation>(&_location)) {
return storage->dcId();
}
return Global::WebFileDcId();
}
mtpFileLoader::RequestData mtpFileLoader::prepareRequest(int offset) const {
mtpFileLoader::RequestData mtpFileLoader::prepareRequest(
int offset,
int dcIndex) const {
auto result = RequestData();
result.dcId = _cdnDcId ? _cdnDcId : dcId();
result.dcIndex = _size
? _downloader->chooseDcIndexForRequest(result.dcId)
: 0;
result.dcIndex = dcIndex;
result.offset = offset;
return result;
}
@@ -796,13 +761,17 @@ mtpRequestId mtpFileLoader::sendRequest(const RequestData &requestData) {
});
}
void mtpFileLoader::makeRequest(int offset) {
void mtpFileLoader::makeRequest(int offset, int dcIndex) {
Expects(!_finished);
auto requestData = prepareRequest(offset);
auto requestData = prepareRequest(offset, dcIndex);
placeSentRequest(sendRequest(requestData), requestData);
}
void mtpFileLoader::makeRequest(int offset) {
makeRequest(offset, _downloader->chooseDcIndexForRequest(dcId()));
}
void mtpFileLoader::requestMoreCdnFileHashes() {
Expects(!_finished);
@@ -1011,7 +980,6 @@ void mtpFileLoader::placeSentRequest(
requestData.dcId,
requestData.dcIndex,
Storage::kPartSize);
++_queue->queriesCount;
_sentRequests.emplace(requestId, requestData);
}
@@ -1024,8 +992,6 @@ int mtpFileLoader::finishSentRequestGetOffset(mtpRequestId requestId) {
requestData.dcId,
requestData.dcIndex,
-Storage::kPartSize);
--_queue->queriesCount;
_sentRequests.erase(it);
return requestData.offset;
@@ -1196,10 +1162,6 @@ std::optional<MediaKey> mtpFileLoader::fileLocationKey() const {
return std::nullopt;
}
mtpFileLoader::~mtpFileLoader() {
cancelRequests();
}
webFileLoader::webFileLoader(
const QString &url,
const QString &to,
@@ -1209,21 +1171,28 @@ webFileLoader::webFileLoader(
: FileLoader(
QString(),
0,
0,
UnknownFileLocation,
LoadToCacheAsWell,
fromCloud,
autoLoading,
cacheTag)
, _url(url) {
_queue = _downloader->queueForWeb();
}
bool webFileLoader::loadPart() {
if (_finished
|| _requestSent
|| _webLoadManager == FinishedWebLoadManager) {
return false;
}
webFileLoader::~webFileLoader() {
markAsNotSent();
}
bool webFileLoader::readyToRequest() const {
return !_finished
&& !_requestSent
&& (_webLoadManager != FinishedWebLoadManager);
}
void webFileLoader::loadPart(int dcIndex) {
Expects(readyToRequest());
if (!_webLoadManager) {
_webLoadMainManager = new WebLoadMainManager();
@@ -1233,9 +1202,8 @@ bool webFileLoader::loadPart() {
_webLoadThread->start();
}
_requestSent = true;
markAsSent();
_webLoadManager->append(this, _url);
return false;
}
int webFileLoader::currentOffset() const {
@@ -1249,6 +1217,7 @@ void webFileLoader::loadProgress(qint64 already, qint64 size) {
}
void webFileLoader::loadFinished(const QByteArray &data) {
markAsNotSent();
if (writeResultPart(0, bytes::make_span(data))) {
if (finalizeResult()) {
notifyAboutProgress();
@@ -1257,6 +1226,7 @@ void webFileLoader::loadFinished(const QByteArray &data) {
}
void webFileLoader::loadError() {
markAsNotSent();
cancel(true);
}
@@ -1269,11 +1239,27 @@ std::optional<MediaKey> webFileLoader::fileLocationKey() const {
}
void webFileLoader::cancelRequests() {
if (!webLoadManager()) return;
if (!webLoadManager()) {
return;
}
webLoadManager()->stop(this);
markAsNotSent();
}
webFileLoader::~webFileLoader() {
void webFileLoader::markAsSent() {
if (_requestSent) {
return;
}
_requestSent = true;
_downloader->requestedAmountIncrement(0, 0, 1);
}
void webFileLoader::markAsNotSent() {
if (!_requestSent) {
return;
}
_requestSent = false;
_downloader->requestedAmountIncrement(0, 0, -1);
}
class webFileLoaderPrivate {
@@ -1372,6 +1358,10 @@ WebLoadManager::WebLoadManager(QThread *thread) {
#endif // OS_MAC_OLD
}
WebLoadManager::~WebLoadManager() {
clear();
}
void WebLoadManager::append(webFileLoader *loader, const QString &url) {
loader->_private = new webFileLoaderPrivate(loader, url);
@@ -1606,10 +1596,6 @@ void WebLoadManager::clear() {
_replies.clear();
}
WebLoadManager::~WebLoadManager() {
clear();
}
void WebLoadMainManager::progress(webFileLoader *loader, qint64 already, qint64 size) {
if (webLoadManager() && webLoadManager()->carries(loader)) {
loader->loadProgress(already, size);