2
0
mirror of https://github.com/telegramdesktop/tdesktop synced 2025-09-02 15:35:51 +00:00

Improve queued by pts updates handling.

The updates are ordered by pts and applied in the correct order.
Also some pts-dependent updates handling was moved to ApiWrap.
This commit is contained in:
John Preston
2017-07-14 14:54:47 +03:00
parent 101ec9a1c1
commit 949104d879
6 changed files with 273 additions and 223 deletions

View File

@@ -1045,18 +1045,14 @@ void MainWidget::deleteHistoryAfterLeave(PeerData *peer, const MTPUpdates &updat
void MainWidget::deleteHistoryPart(DeleteHistoryRequest request, const MTPmessages_AffectedHistory &result) {
auto peer = request.peer;
const auto &d(result.c_messages_affectedHistory());
auto &d = result.c_messages_affectedHistory();
if (peer && peer->isChannel()) {
if (peer->asChannel()->ptsUpdated(d.vpts.v, d.vpts_count.v)) {
peer->asChannel()->ptsApplySkippedUpdates();
}
peer->asChannel()->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v);
} else {
if (ptsUpdated(d.vpts.v, d.vpts_count.v)) {
ptsApplySkippedUpdates();
}
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v);
}
int32 offset = d.voffset.v;
auto offset = d.voffset.v;
if (offset <= 0) {
cRefReportSpamStatuses().remove(peer->id);
Local::writeReportSpamStatuses();
@@ -1148,15 +1144,13 @@ void MainWidget::deleteAllFromUser(ChannelData *channel, UserData *from) {
}
void MainWidget::deleteAllFromUserPart(DeleteAllFromUserParams params, const MTPmessages_AffectedHistory &result) {
const auto &d(result.c_messages_affectedHistory());
if (params.channel->ptsUpdated(d.vpts.v, d.vpts_count.v)) {
params.channel->ptsApplySkippedUpdates();
}
auto &d = result.c_messages_affectedHistory();
params.channel->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v);
int32 offset = d.voffset.v;
auto offset = d.voffset.v;
if (offset > 0) {
MTP::send(MTPchannels_DeleteUserHistory(params.channel->inputChannel, params.from->inputUser), rpcDone(&MainWidget::deleteAllFromUserPart, params));
} else if (History *h = App::historyLoaded(params.channel)) {
} else if (auto h = App::historyLoaded(params.channel)) {
if (!h->lastMsg) {
checkPeerHistory(params.channel);
}
@@ -1714,17 +1708,14 @@ void MainWidget::readRequestDone(PeerData *peer) {
}
void MainWidget::messagesAffected(PeerData *peer, const MTPmessages_AffectedMessages &result) {
const auto &d(result.c_messages_affectedMessages());
auto &d = result.c_messages_affectedMessages();
if (peer && peer->isChannel()) {
if (peer->asChannel()->ptsUpdated(d.vpts.v, d.vpts_count.v)) {
peer->asChannel()->ptsApplySkippedUpdates();
}
peer->asChannel()->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v);
} else {
if (ptsUpdated(d.vpts.v, d.vpts_count.v)) {
ptsApplySkippedUpdates();
}
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v);
}
if (History *h = App::historyLoaded(peer ? peer->id : 0)) {
if (auto h = App::historyLoaded(peer ? peer->id : 0)) {
if (!h->lastMsg) {
checkPeerHistory(peer);
}
@@ -3869,20 +3860,16 @@ void MainWidget::failDifferenceStartTimerFor(ChannelData *channel) {
}
}
bool MainWidget::ptsUpdated(int32 pts, int32 ptsCount) { // return false if need to save that update and apply later
return _ptsWaiter.updated(0, pts, ptsCount);
bool MainWidget::ptsUpdateAndApply(int32 pts, int32 ptsCount, const MTPUpdates &updates) {
return _ptsWaiter.updateAndApply(nullptr, pts, ptsCount, updates);
}
bool MainWidget::ptsUpdated(int32 pts, int32 ptsCount, const MTPUpdates &updates) {
return _ptsWaiter.updated(0, pts, ptsCount, updates);
bool MainWidget::ptsUpdateAndApply(int32 pts, int32 ptsCount, const MTPUpdate &update) {
return _ptsWaiter.updateAndApply(nullptr, pts, ptsCount, update);
}
bool MainWidget::ptsUpdated(int32 pts, int32 ptsCount, const MTPUpdate &update) {
return _ptsWaiter.updated(0, pts, ptsCount, update);
}
void MainWidget::ptsApplySkippedUpdates() {
return _ptsWaiter.applySkippedUpdates(0);
bool MainWidget::ptsUpdateAndApply(int32 pts, int32 ptsCount) {
return _ptsWaiter.updateAndApply(nullptr, pts, ptsCount);
}
void MainWidget::feedDifference(const MTPVector<MTPUser> &users, const MTPVector<MTPChat> &chats, const MTPVector<MTPMessage> &msgs, const MTPVector<MTPUpdate> &other) {
@@ -4760,20 +4747,14 @@ void MainWidget::feedUpdates(const MTPUpdates &updates, uint64 randomId) {
MTP_LOG(0, ("getDifference { good - getting user for updateShortMessage }%1").arg(cTestMode() ? " TESTMODE" : ""));
return getDifference();
}
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, updates)) {
return;
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, updates)) {
// We could've added an item.
// Better would be for history to be subscribed to new messages.
_history->peerMessagesUpdated();
// Update date as well.
updSetState(0, d.vdate.v, updQts, updSeq);
}
// update before applying skipped
auto flags = mtpCastFlags(d.vflags.v) | MTPDmessage::Flag::f_from_id;
auto item = App::histories().addNewMessage(MTP_message(MTP_flags(flags), d.vid, d.is_out() ? MTP_int(AuthSession::CurrentUserId()) : d.vuser_id, MTP_peerUser(d.is_out() ? d.vuser_id : MTP_int(AuthSession::CurrentUserId())), d.vfwd_from, d.vvia_bot_id, d.vreply_to_msg_id, d.vdate, d.vmessage, MTP_messageMediaEmpty(), MTPnullMarkup, d.has_entities() ? d.ventities : MTPnullEntities, MTPint(), MTPint()), NewMessageUnread);
if (item) {
_history->peerMessagesUpdated(item->history()->peer->id);
}
ptsApplySkippedUpdates();
updSetState(0, d.vdate.v, updQts, updSeq);
} break;
case mtpc_updateShortChatMessage: {
@@ -4788,20 +4769,14 @@ void MainWidget::feedUpdates(const MTPUpdates &updates, uint64 randomId) {
if (noFrom && App::api()) App::api()->requestFullPeer(App::chatLoaded(d.vchat_id.v));
return getDifference();
}
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, updates)) {
return;
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, updates)) {
// We could've added an item.
// Better would be for history to be subscribed to new messages.
_history->peerMessagesUpdated();
// Update date as well.
updSetState(0, d.vdate.v, updQts, updSeq);
}
// update before applying skipped
auto flags = mtpCastFlags(d.vflags.v) | MTPDmessage::Flag::f_from_id;
auto item = App::histories().addNewMessage(MTP_message(MTP_flags(flags), d.vid, d.vfrom_id, MTP_peerChat(d.vchat_id), d.vfwd_from, d.vvia_bot_id, d.vreply_to_msg_id, d.vdate, d.vmessage, MTP_messageMediaEmpty(), MTPnullMarkup, d.has_entities() ? d.ventities : MTPnullEntities, MTPint(), MTPint()), NewMessageUnread);
if (item) {
_history->peerMessagesUpdated(item->history()->peer->id);
}
ptsApplySkippedUpdates();
updSetState(0, d.vdate.v, updQts, updSeq);
} break;
case mtpc_updateShortSentMessage: {
@@ -4825,13 +4800,10 @@ void MainWidget::feedUpdates(const MTPUpdates &updates, uint64 randomId) {
}
}
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, updates)) {
return;
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, updates)) {
// Update date as well.
updSetState(0, d.vdate.v, updQts, updSeq);
}
// update before applying skipped
ptsApplySkippedUpdates();
updSetState(0, d.vdate.v, updQts, updSeq);
} break;
case mtpc_updatesTooLong: {
@@ -4855,24 +4827,11 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
return getDifference();
}
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update)) {
// We could've added an item.
// Better would be for history to be subscribed to new messages.
_history->peerMessagesUpdated();
}
// update before applying skipped
bool needToAdd = true;
if (d.vmessage.type() == mtpc_message) { // index forwarded messages to links _overview
if (App::checkEntitiesAndViewsUpdate(d.vmessage.c_message())) { // already in blocks
LOG(("Skipping message, because it is already in blocks!"));
needToAdd = false;
}
}
if (needToAdd) {
if (auto item = App::histories().addNewMessage(d.vmessage, NewMessageUnread)) {
_history->peerMessagesUpdated(item->history()->peer->id);
}
}
ptsApplySkippedUpdates();
} break;
case mtpc_updateMessageID: {
@@ -4906,87 +4865,42 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
case mtpc_updateReadMessagesContents: {
auto &d = update.c_updateReadMessagesContents();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
}
// update before applying skipped
auto &v = d.vmessages.v;
for (int32 i = 0, l = v.size(); i < l; ++i) {
if (auto item = App::histItemById(NoChannel, v.at(i).v)) {
if (item->isMediaUnread()) {
item->markMediaRead();
Ui::repaintHistoryItem(item);
if (item->out() && item->history()->peer->isUser()) {
auto when = requestingDifference() ? 0 : unixtime();
item->history()->peer->asUser()->madeAction(when);
}
}
}
}
ptsApplySkippedUpdates();
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
} break;
case mtpc_updateReadHistoryInbox: {
auto &d = update.c_updateReadHistoryInbox();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
}
// update before applying skipped
App::feedInboxRead(peerFromMTP(d.vpeer), d.vmax_id.v);
ptsApplySkippedUpdates();
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
} break;
case mtpc_updateReadHistoryOutbox: {
auto &d = update.c_updateReadHistoryOutbox();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
}
// update before applying skipped
auto peerId = peerFromMTP(d.vpeer);
auto when = requestingDifference() ? 0 : unixtime();
App::feedOutboxRead(peerId, d.vmax_id.v, when);
if (_history->peer() && _history->peer()->id == peerId) {
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update)) {
// We could've updated the double checks.
// Better would be for history to be subscribed to outbox read events.
_history->update();
}
ptsApplySkippedUpdates();
} break;
case mtpc_updateWebPage: {
auto &d = update.c_updateWebPage();
// update web page anyway
// Update web page anyway.
App::feedWebPage(d.vwebpage);
_history->updatePreview();
webPagesOrGamesUpdate();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
}
ptsApplySkippedUpdates();
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
} break;
case mtpc_updateDeleteMessages: {
auto &d = update.c_updateDeleteMessages();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
if (ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update)) {
// We could've removed some items.
// Better would be for history to be subscribed to removed messages.
_history->peerMessagesUpdated();
}
// update before applying skipped
App::feedWereDeleted(NoChannel, d.vmessages.v);
_history->peerMessagesUpdated();
ptsApplySkippedUpdates();
} break;
case mtpc_updateUserTyping: {
@@ -5260,26 +5174,17 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
if (channel && !_handlingChannelDifference) {
if (channel->ptsRequesting()) { // skip global updates while getting channel difference
return;
} else if (!channel->ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
} else if (channel->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update)) {
// We could've added an item.
// Better would be for history to be subscribed to new messages.
_history->peerMessagesUpdated();
}
}
} else {
App::api()->applyUpdateNoPtsCheck(update);
// update before applying skipped
bool needToAdd = true;
if (d.vmessage.type() == mtpc_message) { // index forwarded messages to links _overview
if (App::checkEntitiesAndViewsUpdate(d.vmessage.c_message())) { // already in blocks
LOG(("Skipping message, because it is already in blocks!"));
needToAdd = false;
}
}
if (needToAdd) {
if (auto item = App::histories().addNewMessage(d.vmessage, NewMessageUnread)) {
_history->peerMessagesUpdated(item->history()->peer->id);
}
}
if (channel && !_handlingChannelDifference) {
channel->ptsApplySkippedUpdates();
// We could've added an item.
// Better would be for history to be subscribed to new messages.
_history->peerMessagesUpdated();
}
} break;
@@ -5290,30 +5195,17 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
if (channel && !_handlingChannelDifference) {
if (channel->ptsRequesting()) { // skip global updates while getting channel difference
return;
} else if (!channel->ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
} else {
channel->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
}
}
// update before applying skipped
App::updateEditedMessage(d.vmessage);
if (channel && !_handlingChannelDifference) {
channel->ptsApplySkippedUpdates();
} else {
App::api()->applyUpdateNoPtsCheck(update);
}
} break;
case mtpc_updateEditMessage: {
auto &d = update.c_updateEditMessage();
if (!ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
}
// update before applying skipped
App::updateEditedMessage(d.vmessage);
ptsApplySkippedUpdates();
ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
} break;
case mtpc_updateChannelPinnedMessage: {
@@ -5356,13 +5248,11 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
if (channel && !_handlingChannelDifference) {
if (channel->ptsRequesting()) { // skip global updates while getting channel difference
return;
} else if (!channel->ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
} else {
channel->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update);
}
}
if (channel && !_handlingChannelDifference) {
channel->ptsApplySkippedUpdates();
} else {
App::api()->applyUpdateNoPtsCheck(update);
}
} break;
@@ -5373,17 +5263,17 @@ void MainWidget::feedUpdate(const MTPUpdate &update) {
if (channel && !_handlingChannelDifference) {
if (channel->ptsRequesting()) { // skip global updates while getting channel difference
return;
} else if (!channel->ptsUpdated(d.vpts.v, d.vpts_count.v, update)) {
return;
} else if (channel->ptsUpdateAndApply(d.vpts.v, d.vpts_count.v, update)) {
// We could've removed some items.
// Better would be for history to be subscribed to removed messages.
_history->peerMessagesUpdated();
}
}
} else {
// We could've removed some items.
// Better would be for history to be subscribed to removed messages.
_history->peerMessagesUpdated();
// update before applying skipped
App::feedWereDeleted(d.vchannel_id.v, d.vmessages.v);
_history->peerMessagesUpdated();
if (channel && !_handlingChannelDifference) {
channel->ptsApplySkippedUpdates();
App::api()->applyUpdateNoPtsCheck(update);
}
} break;