reasonably working

This commit is contained in:
Martin Rotter 2023-01-04 14:11:23 +01:00
parent 10e84709e1
commit e87ecd91f2
7 changed files with 85 additions and 59 deletions

View file

@ -21,9 +21,24 @@
#include <QtConcurrentMap>
FeedDownloader::FeedDownloader()
: QObject(), m_isCacheSynchronizationRunning(false), m_stopCacheSynchronization(false), m_feedsUpdated(0),
m_feedsOriginalCount(0) {
: QObject(), m_isCacheSynchronizationRunning(false), m_stopCacheSynchronization(false) {
qRegisterMetaType<FeedDownloadResults>("FeedDownloadResults");
connect(&m_watcherLookup, &QFutureWatcher<FeedUpdateResult>::resultReadyAt, this, [=](int idx) {
FeedUpdateResult res = m_watcherLookup.resultAt(idx);
emit updateProgress(res.feed, m_watcherLookup.progressValue(), m_watcherLookup.progressMaximum());
});
/*
connect(&m_watcherLookup, &QFutureWatcher<FeedUpdateResult>::progressValueChanged, this, [=](int prog) {
//
});
*/
connect(&m_watcherLookup, &QFutureWatcher<FeedUpdateResult>::finished, this, [=]() {
finalizeUpdate();
});
}
FeedDownloader::~FeedDownloader() {
@ -63,9 +78,6 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
m_results.clear();
m_feeds.clear();
m_feedsOriginalCount = feeds.size();
m_feedsUpdated = 0;
if (feeds.isEmpty()) {
qDebugNN << LOGSEC_FEEDDOWNLOADER << "No feeds to update in worker thread, aborting update.";
}
@ -130,7 +142,7 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
per_acc_states.insert(fd->customId(), per_feed_states);
FeedUpdate fu;
FeedUpdateRequest fu;
fu.account = rt;
fu.feed = fd;
@ -144,7 +156,7 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
}
else {
for (Feed* fd : fds) {
FeedUpdate fu;
FeedUpdateRequest fu;
fu.account = rt;
fu.feed = fd;
@ -162,17 +174,16 @@ void FeedDownloader::updateFeeds(const QList<Feed*>& feeds) {
}
}
std::function<void(const FeedUpdate&)> func = [=](const FeedUpdate& fd) -> void {
updateThreadedFeed(fd);
std::function<FeedUpdateResult(const FeedUpdateRequest&)> func =
[=](const FeedUpdateRequest& fd) -> FeedUpdateResult {
return updateThreadedFeed(fd);
};
QtConcurrent::blockingMap(m_feeds, func);
m_watcherLookup.setFuture(QtConcurrent::mapped(m_feeds, func));
}
finalizeUpdate();
}
void FeedDownloader::updateThreadedFeed(const FeedUpdate& fd) {
FeedUpdateResult FeedDownloader::updateThreadedFeed(const FeedUpdateRequest& fd) {
if (m_erroredAccounts.contains(fd.account)) {
// This feed is errored because its account errored when preparing feed update.
ApplicationException root_ex = m_erroredAccounts.value(fd.account);
@ -184,6 +195,12 @@ void FeedDownloader::updateThreadedFeed(const FeedUpdate& fd) {
}
fd.feed->setLastUpdated(QDateTime::currentDateTimeUtc());
FeedUpdateResult res;
res.feed = fd.feed;
return res;
}
void FeedDownloader::skipFeedUpdateWithError(ServiceRoot* acc, Feed* feed, const ApplicationException& ex) {
@ -195,14 +212,15 @@ void FeedDownloader::skipFeedUpdateWithError(ServiceRoot* acc, Feed* feed, const
else {
feed->setStatus(Feed::Status::OtherError, ex.message());
}
acc->itemChanged({feed});
}
void FeedDownloader::stopRunningUpdate() {
m_stopCacheSynchronization = true;
m_watcherLookup.cancel();
m_watcherLookup.waitForFinished();
m_feeds.clear();
m_feedsOriginalCount = m_feedsUpdated = 0;
}
void FeedDownloader::updateOneFeed(ServiceRoot* acc,
@ -211,8 +229,9 @@ void FeedDownloader::updateOneFeed(ServiceRoot* acc,
const QHash<QString, QStringList>& tagged_messages) {
qlonglong thread_id = qlonglong(QThread::currentThreadId());
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloading new messages for feed ID '" << feed->customId() << "' URL: '"
<< feed->source() << "' title: '" << feed->title() << "' in thread: '" << thread_id << "'.";
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloading new messages for feed ID" << QUOTE_W_SPACE(feed->customId())
<< "URL:" << QUOTE_W_SPACE(feed->source()) << "title:" << QUOTE_W_SPACE(feed->title()) << "in thread "
<< QUOTE_W_SPACE_DOT(thread_id);
int acc_id = acc->accountId();
QElapsedTimer tmr;
@ -224,9 +243,9 @@ void FeedDownloader::updateOneFeed(ServiceRoot* acc,
: qApp->database()->driver()->connection(QSL("feed_upd_%1").arg(thread_id));
QList<Message> msgs = feed->getParentServiceRoot()->obtainNewMessages(feed, stated_messages, tagged_messages);
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloaded " << msgs.size() << " messages for feed ID '" << feed->customId()
<< "' URL: '" << feed->source() << "' title: '" << feed->title() << "' in thread: '"
<< QThread::currentThreadId() << "'. Operation took " << tmr.nsecsElapsed() / 1000 << " microseconds.";
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Downloaded" << NONQUOTE_W_SPACE(msgs.size()) << "messages for feed ID"
<< QUOTE_W_SPACE_COMMA(feed->customId()) << "operation took" << NONQUOTE_W_SPACE(tmr.nsecsElapsed() / 1000)
<< "microseconds.";
bool fix_future_datetimes =
qApp->settings()->value(GROUP(Messages), SETTING(Messages::FixupFutureArticleDateTimes)).toBool();
@ -390,16 +409,11 @@ void FeedDownloader::updateOneFeed(ServiceRoot* acc,
removeDuplicateMessages(msgs);
// Now make sure, that messages are actually stored to SQL in a locked state.
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Saving messages of feed ID '" << feed->customId() << "' URL: '"
<< feed->source() << "' title: '" << feed->title() << "' in thread: '" << QThread::currentThreadId()
<< "'.";
tmr.restart();
auto updated_messages = acc->updateMessages(msgs, feed, false);
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Updating messages in DB took " << tmr.nsecsElapsed() / 1000
<< " microseconds.";
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Updating messages in DB took" << NONQUOTE_W_SPACE(tmr.nsecsElapsed() / 1000)
<< "microseconds.";
if (feed->status() != Feed::Status::NewMessages) {
feed->setStatus(updated_messages.first > 0 || updated_messages.second > 0 ? Feed::Status::NewMessages
@ -426,17 +440,14 @@ void FeedDownloader::updateOneFeed(ServiceRoot* acc,
feed->setStatus(Feed::Status::OtherError, app_ex.message());
}
// feed->getParentServiceRoot()->itemChanged({feed});
m_feedsUpdated++;
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Made progress in feed updates, total feeds count " << m_feedsUpdated << "/"
<< m_feedsOriginalCount << " (id of feed is " << feed->id() << ").";
// emit updateProgress(feed, m_feedsUpdated, m_feedsOriginalCount);
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Made progress in feed updates, total feeds count "
<< m_watcherLookup.progressValue() + 1 << "/" << m_feeds.size() << " (id of feed is " << feed->id() << ").";
}
void FeedDownloader::finalizeUpdate() {
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Finished feed updates in thread: '" << QThread::currentThreadId() << "'.";
qDebugNN << LOGSEC_FEEDDOWNLOADER << "Finished feed updates in thread"
<< QUOTE_W_SPACE_DOT(QThread::currentThreadId());
m_results.sort();
// Update of feeds has finished.

View file

@ -5,6 +5,7 @@
#include <QObject>
#include <QFutureWatcher>
#include <QPair>
#include "core/message.h"
@ -29,13 +30,17 @@ class FeedDownloadResults {
QList<QPair<Feed*, int>> m_updatedFeeds;
};
struct FeedUpdate {
struct FeedUpdateRequest {
Feed* feed = nullptr;
ServiceRoot* account = nullptr;
QHash<ServiceRoot::BagOfMessages, QStringList> stated_messages;
QHash<QString, QStringList> tagged_messages;
};
struct FeedUpdateResult {
Feed* feed = nullptr;
};
// This class offers means to "update" feeds and "special" categories.
// NOTE: This class is used within separate thread.
class FeedDownloader : public QObject {
@ -68,16 +73,15 @@ class FeedDownloader : public QObject {
void finalizeUpdate();
void removeDuplicateMessages(QList<Message>& messages);
void updateThreadedFeed(const FeedUpdate& fd);
FeedUpdateResult updateThreadedFeed(const FeedUpdateRequest& fd);
private:
bool m_isCacheSynchronizationRunning;
bool m_stopCacheSynchronization;
QHash<ServiceRoot*, ApplicationException> m_erroredAccounts;
QList<FeedUpdate> m_feeds = {};
QList<FeedUpdateRequest> m_feeds = {};
QFutureWatcher<FeedUpdateResult> m_watcherLookup;
FeedDownloadResults m_results;
int m_feedsUpdated;
int m_feedsOriginalCount;
};
#endif // FEEDDOWNLOADER_H

View file

@ -10,9 +10,20 @@
#include <QRegularExpression>
#include <QSqlError>
#include <QSqlQuery>
#include <QThread>
DatabaseDriver::DatabaseDriver(QObject* parent) : QObject(parent) {}
QSqlDatabase DatabaseDriver::threadSafeConnection(const QString& connection_name, DesiredStorageType desired_type) {
qlonglong thread_id = qlonglong(QThread::currentThreadId());
bool is_main_thread = QThread::currentThread() == qApp->thread();
QSqlDatabase database =
connection(is_main_thread ? connection_name : QSL("db_connection_%1").arg(thread_id), desired_type);
return database;
}
void DatabaseDriver::updateDatabaseSchema(QSqlQuery& query,
int source_db_schema_version,
const QString& database_name) {

View file

@ -9,25 +9,21 @@
#include <QSqlQuery>
class DatabaseDriver : public QObject {
Q_OBJECT
Q_OBJECT
public:
// Describes available types of database backend.
enum class DriverType {
SQLite,
MySQL
};
enum class DriverType { SQLite, MySQL };
// Describes what type of database user wants.
enum class DesiredStorageType {
StrictlyFileBased,
StrictlyInMemory,
FromSettings
};
enum class DesiredStorageType { StrictlyFileBased, StrictlyInMemory, FromSettings };
explicit DatabaseDriver(QObject* parent = nullptr);
QSqlDatabase threadSafeConnection(const QString& connection_name,
DatabaseDriver::DesiredStorageType desired_type =
DatabaseDriver::DesiredStorageType::FromSettings);
// API.
virtual QString location() const = 0;
virtual QString humanDriverType() const = 0;
@ -43,19 +39,17 @@ class DatabaseDriver : public QObject {
virtual bool finishRestoration() = 0;
virtual qint64 databaseDataSize() = 0;
virtual QSqlDatabase connection(const QString& connection_name,
DatabaseDriver::DesiredStorageType desired_type = DatabaseDriver::DesiredStorageType::FromSettings) = 0;
DatabaseDriver::DesiredStorageType desired_type =
DatabaseDriver::DesiredStorageType::FromSettings) = 0;
protected:
void updateDatabaseSchema(QSqlQuery& query,
int source_db_schema_version,
const QString& database_name = {});
void updateDatabaseSchema(QSqlQuery& query, int source_db_schema_version, const QString& database_name = {});
void setSchemaVersion(QSqlQuery& query, int new_schema_version, bool empty_table);
QStringList prepareScript(const QString& base_sql_folder,
const QString& sql_file,
const QString& database_name = {});
};
#endif // DATABASEDRIVER_H

View file

@ -218,7 +218,7 @@ Application::Application(const QString& id, int& argc, char** argv, const QStrin
auto ideal_th_count = QThread::idealThreadCount();
if (ideal_th_count > 1) {
QThreadPool::globalInstance()->setMaxThreadCount(2 * ideal_th_count);
QThreadPool::globalInstance()->setMaxThreadCount((std::min)(128, 2 * ideal_th_count));
}
qDebugNN << LOGSEC_CORE << "OpenSSL version:" << QUOTE_W_SPACE_DOT(QSslSocket::sslLibraryVersionString());

View file

@ -122,7 +122,7 @@ void FeedReader::initializeFeedDownloader() {
connect(m_feedDownloaderThread, &QThread::finished, m_feedDownloaderThread, &QThread::deleteLater);
connect(m_feedDownloaderThread, &QThread::finished, m_feedDownloader, &FeedDownloader::deleteLater);
connect(m_feedDownloader, &FeedDownloader::updateFinished, this, &FeedReader::feedUpdatesFinished);
connect(m_feedDownloader, &FeedDownloader::updateFinished, this, &FeedReader::onFeedUpdatesFinished);
connect(m_feedDownloader, &FeedDownloader::updateProgress, this, &FeedReader::feedUpdatesProgress);
connect(m_feedDownloader, &FeedDownloader::updateStarted, this, &FeedReader::feedUpdatesStarted);
connect(m_feedDownloader, &FeedDownloader::updateFinished, qApp->feedUpdateLock(), &Mutex::unlock);
@ -348,6 +348,11 @@ void FeedReader::executeNextAutoUpdate() {
}
}
void FeedReader::onFeedUpdatesFinished(FeedDownloadResults updated_feeds) {
m_feedsModel->reloadWholeLayout();
emit feedUpdatesFinished(updated_feeds);
}
QList<MessageFilter*> FeedReader::messageFilters() const {
return m_messageFilters;
}

View file

@ -72,6 +72,7 @@ class RSSGUARD_DLLSPEC FeedReader : public QObject {
private slots:
void executeNextAutoUpdate();
void onFeedUpdatesFinished(FeedDownloadResults updated_feeds);
signals:
void feedUpdatesStarted();