Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3156343
Avoid passing QSqlDatabase by mutable reference
uklotzde May 28, 2017
16f7bf3
Define a constant for the embedded schema file
uklotzde Jun 2, 2017
8e76d17
Reading values from SettingsDAO should be const
uklotzde Jun 2, 2017
72ff095
Make SchemaManager stateful and improve logging
uklotzde Jun 2, 2017
152314f
Reuse DB connection from TrackCollection in SchemaManagerTest
uklotzde Jun 2, 2017
82bf436
Improve database logging
uklotzde Jun 3, 2017
63d86cd
Improve error handling and logging when upgrading the database schema
uklotzde Jun 3, 2017
64254ae
Remove undefined function and fix const signature
uklotzde Jun 3, 2017
ff92585
Strip and reorder members of TrackCollection
uklotzde Jun 3, 2017
fcbdbd7
Cleanup library scanner code
uklotzde Jun 4, 2017
fbfb3a0
Reuse temporary Mixxx DB in test class
uklotzde Jun 4, 2017
e0817dc
Simplify LibraryScannerTest
uklotzde Jun 4, 2017
bb5534f
Do not store reference in member variable
uklotzde Jun 4, 2017
f6b03c8
Initialize (and upgrade) the database schema separately
uklotzde Jun 4, 2017
8aae9be
Add missing initialization of database
uklotzde Jun 5, 2017
857205d
Consistently wire DAOs to the database
uklotzde Jun 5, 2017
9bd42ce
Move database artifacts into separate folder
uklotzde Jun 5, 2017
200df8a
Move Mixxx DB code from TrackCollection into separate class
uklotzde Jun 5, 2017
f689c43
Change naming
uklotzde Jun 5, 2017
97e802f
Cleanup some definitions
uklotzde Jun 6, 2017
ad48f8b
Reuse SqlStorage interface for TrackCollection
uklotzde Jun 6, 2017
62b6f5f
Move DbConnection into mixxx namespace
uklotzde Jun 6, 2017
ca57051
Use mixxx::Logger for CrateStorage
uklotzde Jun 6, 2017
b81976f
Delete unused transaction parameters
uklotzde Jun 6, 2017
ef3f61a
Explicitly provide database connection for schema init/upgrade
uklotzde Jun 6, 2017
7ba6823
Explicitly open/close database connections
uklotzde Jun 7, 2017
2295dc6
Allow to use a custom name for the database connection
uklotzde Jun 7, 2017
8aca140
Use a managed database connection for library scanner
uklotzde Jun 7, 2017
2004ad0
Use a managed database connection(s) for waveform analyzer(s)
uklotzde Jun 7, 2017
eeb8661
Delete undefined member functions from AnalysisDao
uklotzde Jun 7, 2017
83a1e29
Define a constant for [Library] config group
uklotzde Jun 7, 2017
f861db2
Don't repair database without permission by the user
uklotzde Jun 7, 2017
2d9480e
Add missing #include directive
uklotzde Jun 7, 2017
069b8fc
Use mixxx::Logger in AnalyzerQueue
uklotzde Jun 8, 2017
3753bc5
Move DAO and database connection into correct thread
uklotzde Jun 8, 2017
45101bf
Generate thread ids in a thread-safe manner
uklotzde Jun 8, 2017
b120d72
Delete unused parameters
uklotzde Jun 9, 2017
38fb16b
Simplify the creation of analyzer queue threads
uklotzde Jun 12, 2017
6cb6c76
Move AnalysisDao back into waveform analyzer
uklotzde Jun 9, 2017
26d23b2
Minor edits
uklotzde Jun 9, 2017
8b0e949
Delete duplicate code
uklotzde Jun 9, 2017
077b56f
Thread-safe instance counter for both AnalyzerQueue and LibScanner
uklotzde Jun 9, 2017
4243f6f
Repository is-a component of Library
uklotzde Jun 9, 2017
2e1d15d
AnalysisFeature (soon) needs a Library parent
uklotzde Jun 9, 2017
40ccb3d
Manage a pool of thread-local database connections
uklotzde Jun 10, 2017
7eb8c8d
Encapsulate database connection params
uklotzde Jun 10, 2017
29afb3f
Reduce dependencies between library components and database connections
uklotzde Jun 10, 2017
84b3e3e
Eliminate more explicit database() calls
uklotzde Jun 10, 2017
c7f0cda
Delete obsolete #include directives
uklotzde Jun 12, 2017
506891d
Delete unused header file
uklotzde Jun 12, 2017
518d7a6
Delete unused function and reformatting
uklotzde Jun 12, 2017
7dc9434
Fix include directives
uklotzde Jun 12, 2017
f11ea9b
Change naming of local variables in anonymous namespace
uklotzde Jun 13, 2017
7a30111
Renaming: 'Repository' -> 'MixxxDb'
uklotzde Jun 13, 2017
b426969
Rename to ThreadLocalScoped and explain how the pool works
uklotzde Jun 14, 2017
79ecbf9
Introduce DbConnectionPooled for thread-local, pooled connections
uklotzde Jun 15, 2017
169ee10
Split DbConnectionPool API into roles: "Pooler" vs. "Pooled"
uklotzde Jun 15, 2017
dacbaaa
Add some comments on how to use the new connection pool API classes
uklotzde Jun 15, 2017
2a66897
Avoid dependency between "Pooler" and "Pooled" just for convenience
uklotzde Jun 15, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion build/depends.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,10 @@ def sources(self, build):
"widget/wlibrarytableview.cpp",
"widget/wanalysislibrarytableview.cpp",
"widget/wlibrarytextbrowser.cpp",

"database/mixxxdb.cpp",
"database/schemamanager.cpp",

"library/trackcollection.cpp",
"library/basesqltablemodel.cpp",
"library/basetrackcache.cpp",
Expand Down Expand Up @@ -969,7 +973,6 @@ def sources(self, build):
"library/dao/autodjcratesdao.cpp",

"library/librarycontrol.cpp",
"library/schemamanager.cpp",
"library/songdownloader.cpp",
"library/starrating.cpp",
"library/stardelegate.cpp",
Expand Down Expand Up @@ -1117,6 +1120,9 @@ def sources(self, build):
"util/movinginterquartilemean.cpp",
"util/console.cpp",
"util/db/dbconnection.cpp",
"util/db/dbconnectionpool.cpp",
"util/db/dbconnectionpooler.cpp",
"util/db/dbconnectionpooled.cpp",
"util/db/dbid.cpp",
"util/db/fwdsqlquery.cpp",
"util/db/fwdsqlqueryselectresult.cpp",
Expand Down
167 changes: 71 additions & 96 deletions src/analyzer/analyzerqueue.cpp
Original file line number Diff line number Diff line change
@@ -1,25 +1,21 @@
#include "analyzer/analyzerqueue.h"

#include <typeinfo>

#include <QtDebug>
#include <QMutexLocker>

#ifdef __VAMP__
#include "analyzer/analyzerbeats.h"
#include "analyzer/analyzerkey.h"
#endif
#include "analyzer/analyzergain.h"
#include "analyzer/analyzerebur128.h"
#include "analyzer/analyzerwaveform.h"
#include "library/trackcollection.h"
#include "mixer/playerinfo.h"
#include "sources/soundsourceproxy.h"
#include "track/track.h"
#include "util/compatibility.h"
#include "util/db/dbconnectionpooler.h"
#include "util/event.h"
#include "util/timer.h"
#include "util/trace.h"
#include "util/logger.h"

// Measured in 0.1%,
// 0 for no progress during finalize
Expand All @@ -28,46 +24,52 @@
#define FINALIZE_PROMILLE 1

namespace {
// Analysis is done in blocks.
// We need to use a smaller block size, because on Linux the AnalyzerQueue
// can starve the CPU of its resources, resulting in xruns. A block size
// of 4096 frames per block seems to do fine.
const SINT kAnalysisChannels = mixxx::AudioSource::kChannelCountStereo;
const SINT kAnalysisFramesPerBlock = 4096;
const SINT kAnalysisSamplesPerBlock =
kAnalysisFramesPerBlock * kAnalysisChannels;

mixxx::Logger kLogger("AnalyzerQueue");

// Analysis is done in blocks.
// We need to use a smaller block size, because on Linux the AnalyzerQueue
// can starve the CPU of its resources, resulting in xruns. A block size
// of 4096 frames per block seems to do fine.
const SINT kAnalysisChannels = mixxx::AudioSource::kChannelCountStereo;
const SINT kAnalysisFramesPerBlock = 4096;
const SINT kAnalysisSamplesPerBlock =
kAnalysisFramesPerBlock * kAnalysisChannels;

QAtomicInt s_instanceCounter(0);

} // anonymous namespace

AnalyzerQueue::AnalyzerQueue(TrackCollection* pTrackCollection)
: m_aq(),
AnalyzerQueue::AnalyzerQueue(
mixxx::DbConnectionPoolPtr pDbConnectionPool,
const UserSettingsPointer& pConfig,
Mode mode)
: m_pDbConnectionPool(std::move(pDbConnectionPool)),
m_exit(false),
m_aiCheckPriorities(false),
m_sampleBuffer(kAnalysisSamplesPerBlock),
m_tioq(),
m_qm(),
m_qwait(),
m_queue_size(0) {
Q_UNUSED(pTrackCollection);

if (mode != Mode::WithoutWaveform) {
m_pAnalyzers.push_back(std::make_unique<AnalyzerWaveform>(pConfig));
}
m_pAnalyzers.push_back(std::make_unique<AnalyzerGain>(pConfig));
m_pAnalyzers.push_back(std::make_unique<AnalyzerEbur128>(pConfig));
#ifdef __VAMP__
m_pAnalyzers.push_back(std::make_unique<AnalyzerBeats>(pConfig));
m_pAnalyzers.push_back(std::make_unique<AnalyzerKey>(pConfig));
#endif

connect(this, SIGNAL(updateProgress()),
this, SLOT(slotUpdateProgress()));

start(QThread::LowPriority);
}

AnalyzerQueue::~AnalyzerQueue() {
stop();
m_progressInfo.sema.release();
wait(); //Wait until thread has actually stopped before proceeding.

QListIterator<Analyzer*> it(m_aq);
while (it.hasNext()) {
Analyzer* an = it.next();
//qDebug() << "AnalyzerQueue: deleting " << typeid(an).name();
delete an;
}
//qDebug() << "AnalyzerQueue::~AnalyzerQueue()";
}

void AnalyzerQueue::addAnalyzer(Analyzer* an) {
m_aq.push_back(an);
}

// This is called from the AnalyzerQueue thread
Expand Down Expand Up @@ -95,10 +97,9 @@ bool AnalyzerQueue::isLoadedTrackWaiting(TrackPointer analysingTrack) {
int progress = pTrack->getAnalyzerProgress();
if (progress < 0) {
// Load stored analysis
QListIterator<Analyzer*> ita(m_aq);
bool processTrack = false;
while (ita.hasNext()) {
if (!ita.next()->isDisabledOrLoadStoredSuccess(pTrack)) {
for (auto const& pAnalyzer: m_pAnalyzers) {
if (!pAnalyzer->isDisabledOrLoadStoredSuccess(pTrack)) {
processTrack = true;
}
}
Expand Down Expand Up @@ -154,7 +155,7 @@ TrackPointer AnalyzerQueue::dequeueNextBlocking() {
}
// Prioritize tracks that are loaded.
if (info.isTrackLoaded(pTrack)) {
qDebug() << "Prioritizing" << pTrack->getTitle() << pTrack->getLocation();
kLogger.debug() << "Prioritizing" << pTrack->getTitle() << pTrack->getLocation();
pLoadTrack = pTrack;
it.remove();
break;
Expand All @@ -169,7 +170,7 @@ TrackPointer AnalyzerQueue::dequeueNextBlocking() {
m_qm.unlock();

if (pLoadTrack) {
qDebug() << "Analyzing" << pLoadTrack->getTitle() << pLoadTrack->getLocation();
kLogger.debug() << "Analyzing" << pLoadTrack->getTitle() << pLoadTrack->getLocation();
}
// pTrack might be NULL, up to the caller to check.
return pLoadTrack;
Expand Down Expand Up @@ -206,20 +207,16 @@ bool AnalyzerQueue::doAnalysis(TrackPointer tio, mixxx::AudioSourcePointer pAudi
// the full block size.
if (kAnalysisFramesPerBlock == framesRead) {
// Complete analysis block of audio samples has been read.
QListIterator<Analyzer*> it(m_aq);
while (it.hasNext()) {
Analyzer* an = it.next();
//qDebug() << typeid(*an).name() << ".process()";
an->process(m_sampleBuffer.data(), m_sampleBuffer.size());
//qDebug() << "Done " << typeid(*an).name() << ".process()";
for (auto const& pAnalyzer: m_pAnalyzers) {
pAnalyzer->process(m_sampleBuffer.data(), m_sampleBuffer.size());
}
} else {
// Partial analysis block of audio samples has been read.
// This should only happen at the end of an audio stream,
// otherwise a decoding error must have occurred.
if (frameIndex < pAudioSource->getMaxFrameIndex()) {
// EOF not reached -> Maybe a corrupt file?
qWarning() << "Failed to read sample data from file:"
kLogger.warning() << "Failed to read sample data from file:"
<< tio->getLocation()
<< "@" << frameIndex;
if (0 >= framesRead) {
Expand Down Expand Up @@ -258,7 +255,7 @@ bool AnalyzerQueue::doAnalysis(TrackPointer tio, mixxx::AudioSourcePointer pAudi
// has something new entered the queue?
if (m_aiCheckPriorities.fetchAndStoreAcquire(false)) {
if (isLoadedTrackWaiting(tio)) {
qDebug() << "Interrupting analysis to give preference to a loaded track.";
kLogger.debug() << "Interrupting analysis to give preference to a loaded track.";
dieflag = true;
cancelled = true;
}
Expand Down Expand Up @@ -286,12 +283,29 @@ void AnalyzerQueue::stop() {
}

void AnalyzerQueue::run() {
unsigned static id = 0; // the id of this thread, for debugging purposes
QThread::currentThread()->setObjectName(QString("AnalyzerQueue %1").arg(++id));

// If there are no analyzers, don't waste time running.
if (m_aq.size() == 0)
if (m_pAnalyzers.empty()) {
return;
}

const int instanceId = s_instanceCounter.fetchAndAddAcquire(1) + 1;
QThread::currentThread()->setObjectName(QString("AnalyzerQueue %1").arg(instanceId));

kLogger.debug() << "Entering thread";

execThread();

kLogger.debug() << "Exiting thread";
}

void AnalyzerQueue::execThread() {
const mixxx::DbConnectionPooler dbConnection(m_pDbConnectionPool);
if (!dbConnection) {
kLogger.warning()
<< "Failed to open database connection for analyzer queue";
kLogger.debug() << "Exiting thread";
return;
}

m_progressInfo.current_track.reset();
m_progressInfo.track_progress = 0;
Expand Down Expand Up @@ -323,16 +337,15 @@ void AnalyzerQueue::run() {
audioSrcCfg.setChannelCount(kAnalysisChannels);
mixxx::AudioSourcePointer pAudioSource(soundSourceProxy.openAudioSource(audioSrcCfg));
if (!pAudioSource) {
qWarning() << "Failed to open file for analyzing:" << nextTrack->getLocation();
kLogger.warning() << "Failed to open file for analyzing:" << nextTrack->getLocation();
emptyCheck();
continue;
}

QListIterator<Analyzer*> it(m_aq);
bool processTrack = false;
while (it.hasNext()) {
for (auto const& pAnalyzer: m_pAnalyzers) {
// Make sure not to short-circuit initialize(...)
if (it.next()->initialize(nextTrack, pAudioSource->getSamplingRate(), pAudioSource->getFrameCount() * kAnalysisChannels)) {
if (pAnalyzer->initialize(nextTrack, pAudioSource->getSamplingRate(), pAudioSource->getFrameCount() * kAnalysisChannels)) {
processTrack = true;
}
}
Expand All @@ -346,26 +359,24 @@ void AnalyzerQueue::run() {
bool completed = doAnalysis(nextTrack, pAudioSource);
if (!completed) {
// This track was cancelled
QListIterator<Analyzer*> itf(m_aq);
while (itf.hasNext()) {
itf.next()->cleanup(nextTrack);
for (auto const& pAnalyzer: m_pAnalyzers) {
pAnalyzer->cleanup(nextTrack);
}
queueAnalyseTrack(nextTrack);
emitUpdateProgress(nextTrack, 0);
} else {
// 100% - FINALIZE_PERCENT finished
emitUpdateProgress(nextTrack, 1000 - FINALIZE_PROMILLE);
// This takes around 3 sec on a Atom Netbook
QListIterator<Analyzer*> itf(m_aq);
while (itf.hasNext()) {
itf.next()->finalize(nextTrack);
for (auto const& pAnalyzer: m_pAnalyzers) {
pAnalyzer->finalize(nextTrack);
}
emit(trackDone(nextTrack));
emitUpdateProgress(nextTrack, 1000); // 100%
}
} else {
emitUpdateProgress(nextTrack, 1000); // 100%
qDebug() << "Skipping track analysis because no analyzer initialized.";
kLogger.debug() << "Skipping track analysis because no analyzer initialized.";
}
emptyCheck();
}
Expand Down Expand Up @@ -431,39 +442,3 @@ void AnalyzerQueue::queueAnalyseTrack(TrackPointer tio) {
}
m_qm.unlock();
}

// static
AnalyzerQueue* AnalyzerQueue::createDefaultAnalyzerQueue(
UserSettingsPointer pConfig, TrackCollection* pTrackCollection) {
AnalyzerQueue* ret = new AnalyzerQueue(pTrackCollection);

ret->addAnalyzer(new AnalyzerWaveform(pConfig));
ret->addAnalyzer(new AnalyzerGain(pConfig));
ret->addAnalyzer(new AnalyzerEbur128(pConfig));
#ifdef __VAMP__
ret->addAnalyzer(new AnalyzerBeats(pConfig));
ret->addAnalyzer(new AnalyzerKey(pConfig));
#endif

ret->start(QThread::LowPriority);
return ret;
}

// static
AnalyzerQueue* AnalyzerQueue::createAnalysisFeatureAnalyzerQueue(
UserSettingsPointer pConfig, TrackCollection* pTrackCollection) {
AnalyzerQueue* ret = new AnalyzerQueue(pTrackCollection);

if (pConfig->getValue<bool>(ConfigKey("[Library]", "EnableWaveformGenerationWithAnalysis"))) {
ret->addAnalyzer(new AnalyzerWaveform(pConfig));
}
ret->addAnalyzer(new AnalyzerGain(pConfig));
ret->addAnalyzer(new AnalyzerEbur128(pConfig));
#ifdef __VAMP__
ret->addAnalyzer(new AnalyzerBeats(pConfig));
ret->addAnalyzer(new AnalyzerKey(pConfig));
#endif

ret->start(QThread::LowPriority);
return ret;
}
30 changes: 18 additions & 12 deletions src/analyzer/analyzerqueue.h
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
#ifndef ANALYZER_ANALYZERQUEUE_H
#define ANALYZER_ANALYZERQUEUE_H

#include <QList>
#include <QThread>
#include <QQueue>
#include <QWaitCondition>
#include <QSemaphore>

#include <vector>

#include "analyzer/analyzer.h"
#include "preferences/usersettings.h"
#include "sources/audiosource.h"
#include "track/track.h"
#include "util/db/dbconnectionpool.h"
#include "util/samplebuffer.h"
#include "util/memory.h"

class TrackCollection;
class Analyzer;

class AnalyzerQueue : public QThread {
Q_OBJECT

public:
AnalyzerQueue(TrackCollection* pTrackCollection);
virtual ~AnalyzerQueue();
enum class Mode {
Default,
WithoutWaveform,
};

AnalyzerQueue(
mixxx::DbConnectionPoolPtr pDbConnectionPool,
const UserSettingsPointer& pConfig,
Mode mode = Mode::Default);
~AnalyzerQueue() override;

void stop();
void queueAnalyseTrack(TrackPointer tio);

static AnalyzerQueue* createDefaultAnalyzerQueue(
UserSettingsPointer pConfig, TrackCollection* pTrackCollection);
static AnalyzerQueue* createAnalysisFeatureAnalyzerQueue(
UserSettingsPointer pConfig, TrackCollection* pTrackCollection);

public slots:
void slotAnalyseTrack(TrackPointer tio);
void slotUpdateProgress();
Expand All @@ -55,9 +58,12 @@ class AnalyzerQueue : public QThread {
QSemaphore sema;
};

void addAnalyzer(Analyzer* an);
mixxx::DbConnectionPoolPtr m_pDbConnectionPool;

typedef std::unique_ptr<Analyzer> AnalyzerPtr;
std::vector<AnalyzerPtr> m_pAnalyzers;

QList<Analyzer*> m_aq;
void execThread();

bool isLoadedTrackWaiting(TrackPointer analysingTrack);
TrackPointer dequeueNextBlocking();
Expand Down
Loading