From 005f20bfa976779e7e3cc6024e85ae1f6138e7ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oliver=20Gro=C3=9F?= Date: Thu, 23 Apr 2009 10:15:30 +0200 Subject: [PATCH] add queue and "serialize" transfer --- common.h | 5 +- ctransfermanager.cpp | 50 ++++--- ctransfermanager.h | 2 +- cworker.cpp | 295 +++++++++++++++++++---------------------- cworker.h | 5 +- qtransferlistmodel.cpp | 10 +- 6 files changed, 187 insertions(+), 180 deletions(-) diff --git a/common.h b/common.h index 3849af2..3ea56d4 100644 --- a/common.h +++ b/common.h @@ -28,8 +28,8 @@ #define DESCRIPTOR_STRING "QFTH" namespace qftrans { - enum TransferStatus {TS_WAITING = 0, TS_ANNOUNCING = 1, TS_TRANSFERING = 2, TS_CLOSING = 3, TS_FINISHED = 4}; - enum HeaderType {HT_ID = 0, HT_SIZE = 1, HT_NAME = 2, HT_DATA = 3, HT_CLOSE = 4, HT_ACK = 5, HT_CANCEL = 6}; + enum TransferStatus {TS_SUSPENDED = 0, TS_PENDING = 1, TS_ANNOUNCING = 2, TS_WAITING = 3, TS_TRANSFERING = 4, TS_CLOSING = 5, TS_FINISHED = 6}; + enum HeaderType {HT_ANNOUNCE = 0, HT_DATA = 1, HT_CLOSE = 2, HT_ACK = 3, HT_CANCEL = 4}; struct TransferData { QString fileDir; @@ -39,6 +39,7 @@ namespace qftrans { qint64 size; qint64 transfered; HeaderType lastHeader; + quint32 id; }; struct TransferHeader { diff --git a/ctransfermanager.cpp b/ctransfermanager.cpp index 2ed8507..a25f6f2 100644 --- a/ctransfermanager.cpp +++ b/ctransfermanager.cpp @@ -45,8 +45,8 @@ namespace qftrans { m_ActAsServer(false), m_Connected(false), m_DestinationDir(QDir::homePath()), - m_TransferThread(NULL), - m_SelectedTransfer(NULL) + m_TransferThread(NULL) +// m_SelectedTransfer(NULL) { setupUi(); } @@ -71,8 +71,8 @@ namespace qftrans { newUpload->fileName = fileInfo.fileName(); newUpload->size = fileInfo.size(); newUpload->transfered = 0; - newUpload->status = TS_WAITING; - newUpload->lastHeader = HT_ID; + newUpload->status = TS_SUSPENDED; + newUpload->lastHeader = HT_ANNOUNCE; newUpload->localFile = true; addTransfer(newUpload); @@ -130,15 +130,28 @@ namespace qftrans { } void CTransferManager::restartSelected() { - emit uploadAddRequested(m_SelectedTransfer); - emit uploadRestartRequested(m_SelectedTransfer); + QModelIndexList selectedIndexes = m_TransferView->selectionModel()->selectedIndexes(); + + foreach (QModelIndex i, selectedIndexes) { + emit uploadAddRequested(static_cast(i.internalPointer())); + } + +// emit uploadAddRequested(m_SelectedTransfer); } void CTransferManager::cancelSelected() { - if (m_SelectedTransfer->localFile) + QModelIndexList selectedIndexes = m_TransferView->selectionModel()->selectedIndexes(); + + foreach (QModelIndex i, selectedIndexes) { + if (static_cast(i.internalPointer())->localFile) + emit uploadRemoveRequested(static_cast(i.internalPointer())); + else + emit downloadRemoveRequested(static_cast(i.internalPointer())); + } +/* if (m_SelectedTransfer->localFile) emit uploadRemoveRequested(m_SelectedTransfer); else - emit downloadRemoveRequested(m_SelectedTransfer); + emit downloadRemoveRequested(m_SelectedTransfer);*/ } inline void CTransferManager::setupUi() { @@ -181,6 +194,7 @@ namespace qftrans { currentMenu->addAction(tr("About &Qt"), qApp, SLOT(aboutQt())); m_TransferView = new QListView(this); + m_TransferView->setSelectionMode(QAbstractItemView::ExtendedSelection); setCentralWidget(m_TransferView); QTransferListModel * listModel = new QTransferListModel(&m_Data); @@ -196,8 +210,7 @@ namespace qftrans { updateConnectionState(false); m_ConnectionDialog = new CConnectionDialog(this); - -} + } void CTransferManager::updateConnectionState(bool connected) { m_Connected = connected; @@ -231,7 +244,7 @@ namespace qftrans { connect(worker, SIGNAL(uploadUpdated(TransferData *)), model, SLOT(updateEntry(TransferData *))); connect(this, SIGNAL(uploadAddRequested(TransferData *)), worker, SLOT(addUpload(TransferData *))); - connect(this, SIGNAL(uploadRestartRequested(TransferData *)), worker, SLOT(startUpload(TransferData *))); +// connect(this, SIGNAL(uploadRestartRequested(TransferData *)), worker, SLOT(startUpload(TransferData *))); connect(this, SIGNAL(uploadRemoveRequested(TransferData *)), worker, SLOT(removeUpload(TransferData *))); connect(this, SIGNAL(downloadRemoveRequested(TransferData *)), worker, SLOT(removeDownload(TransferData *))); @@ -239,13 +252,16 @@ namespace qftrans { void CTransferManager::handleSelectionChanged(const QItemSelection & selected) { QModelIndexList selectedIndexes = selected.indexes(); - if (selectedIndexes.isEmpty()) - m_SelectedTransfer = NULL; - else - m_SelectedTransfer = static_cast(selectedIndexes[0].internalPointer()); +// if (selectedIndexes.isEmpty()) +// m_SelectedTransfer = NULL; +// else +// m_SelectedTransfer = static_cast(selectedIndexes[0].internalPointer()); +// + m_TransferRestartAction->setDisabled(selectedIndexes.isEmpty()); + m_TransferCancelAction->setDisabled(selectedIndexes.isEmpty()); - m_TransferRestartAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->localFile); - m_TransferCancelAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->status != TS_FINISHED); +/* m_TransferRestartAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->localFile); + m_TransferCancelAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->status != TS_FINISHED);*/ } void CTransferManager::addTransfer(TransferData * transfer) { diff --git a/ctransfermanager.h b/ctransfermanager.h index 64c3190..80e15a2 100644 --- a/ctransfermanager.h +++ b/ctransfermanager.h @@ -85,7 +85,7 @@ namespace qftrans { QListView * m_TransferView; - TransferData * m_SelectedTransfer; +// TransferData * m_SelectedTransfer; void setupUi(); diff --git a/cworker.cpp b/cworker.cpp index 1481bf2..9d80d51 100644 --- a/cworker.cpp +++ b/cworker.cpp @@ -38,44 +38,59 @@ namespace qftrans { //TODO addUpload void CWorker::addUpload(TransferData * transfer) { - if (!m_UploadDict.contains(transfer)) { + transfer->status = TS_PENDING; + + if (!m_Uploads.contains(transfer)) { qDebug() << "adding upload"; - transfer->status = TS_WAITING; - m_UploadDict.insert(transfer, m_IdCounter); + transfer->id = m_IdCounter; m_IdCounter++; + m_Uploads.enqueue(transfer); + } + else + qDebug() << "(re)adding upload"; + + if (m_Socket->bytesToWrite() == 0) { + qDebug() << "executing upload"; + writeData(); } } //TODO removeUpload void CWorker::removeUpload(TransferData * transfer) { - if (m_UploadDict.contains(transfer) && transfer->status != TS_WAITING) { + if (m_Uploads.contains(transfer) && transfer->status > TS_PENDING) { transfer->status = TS_CLOSING; if (m_Socket->bytesToWrite() == 0) { - qDebug() << "executing upload"; - writeData(); - } - } - } - //TODO startUpload - void CWorker::startUpload(TransferData * transfer) { - if (m_UploadDict.contains(transfer) && transfer->status == TS_WAITING) { - qDebug() << "scheduling upload for announce"; - transfer->status = TS_ANNOUNCING; - if (m_Socket->bytesToWrite() == 0) { - qDebug() << "executing upload"; + qDebug() << "executing closedown"; writeData(); } } } + //TODO ? startUpload +// void CWorker::startUpload(TransferData * transfer) { +// if (m_Uploads.contains(transfer) && transfer->status != TS_PENDING) +// transfer->status = TS_PENDING; +// +// if (m_Socket->bytesToWrite() == 0) { +// qDebug() << "executing upload"; +// writeData(); +// } +// } void CWorker::startUploadData(quint32 id) { - TransferData * transfer = m_UploadDict.key(id); - if (transfer) { - qDebug() << "scheduling upload for transfer"; - transfer->status = TS_TRANSFERING; - if (m_Socket->bytesToWrite() == 0) { - qDebug() << "executing upload"; - writeData(); - } + if (m_Uploads.isEmpty()) + return; + + TransferData * transfer = m_Uploads.head(); /*m_UploadDict.key(id);*/ + + if (transfer->id != id) { + qDebug() << "got invalid id:" << id << "... expected:" << transfer->id; + return; + } + + qDebug() << "scheduling upload for transfer"; + transfer->status = TS_TRANSFERING; + if (m_Socket->bytesToWrite() == 0) { + qDebug() << "executing upload"; + writeData(); } } @@ -85,7 +100,6 @@ namespace qftrans { out.setVersion(QDataStream::Qt_4_0); TransferHeader header; -// header.descriptor = DESCRIPTOR_STRING; header.type = HT_ACK; header.id = id; @@ -109,141 +123,114 @@ namespace qftrans { } void CWorker::writeData() { - if (m_UploadDict.isEmpty()) { + if (m_Uploads.isEmpty()) { +// if (m_UploadDict.isEmpty()) { qDebug() << "no more uploads."; return; } + TransferData * transfer = m_Uploads.head(); + + if (transfer->status == TS_PENDING) + transfer->status = TS_ANNOUNCING; + QByteArray block; QDataStream out(&block, QIODevice::ReadWrite); out.setVersion(QDataStream::Qt_4_0); - QTransferDataList transfers = m_UploadDict.keys(); - QTransferDataList::iterator i; - for (i = transfers.begin(); i != transfers.end(); ++i) { - block.clear(); - - TransferHeader header; - switch ((*i)->status) { - case TS_ANNOUNCING: { - qDebug() << "announcing" << m_UploadDict[*i]; - if ((*i)->transfered) - (*i)->transfered = 0; - - // ID header with no data - header.type = HT_ID; - header.id = m_UploadDict[*i]; - header.length = 0; - - out.writeRawData((const char *)(&header), sizeof(TransferHeader)); - - //size header with size - header.type = HT_SIZE; - header.length = sizeof(qint64); - - out.writeRawData((const char *)(&header), sizeof(TransferHeader)); - - qDebug() << "sending size" << (*i)->size; - out << (*i)->size; - - //name header with file name - - qint64 oldPosition = block.size(); - - //fake write filename - out << (*i)->fileName; - out.device()->seek(oldPosition); - - header.type = HT_NAME; - header.length = quint32(block.size() - oldPosition); - -// qDebug() << "pos" << out.device()->pos() << block.size(); - out.writeRawData((const char *)(&header), sizeof(TransferHeader)); -// qDebug() << "pos post header" << out.device()->pos() << block.size(); - -// qint64 oldPosition = block.size() - sizeof(quint32); - - qDebug() << "sending filename" << (*i)->fileName; - out << (*i)->fileName; -// qDebug() << "pos post filename" << out.device()->pos() << block.size(); -// out.device()->seek(oldPosition); -// qDebug() << "pos post seek" << out.device()->pos(); - - -/* qDebug() << "filename data length" << header.length; - out << header.length; - out.device()->seek(oldPosition); - out >> header.length; - qDebug() << "written length" << header.length; - out.device()->seek(block.size()); - qDebug() << "pos post length correct" << out.device()->pos() << block.size();*/ - - (*i)->status = TS_WAITING; - } - break; - case TS_TRANSFERING: { -// qDebug() << "transfering" << m_DataDict[*i]; - header.type = HT_DATA; - header.id = m_UploadDict[*i]; - if ((*i)->transfered + DATA_BLOCK_SIZE < (*i)->size) - header.length = DATA_BLOCK_SIZE; - else - header.length =(*i)->size - (*i)->transfered; - - out.writeRawData((const char *)(&header), sizeof(TransferHeader)); - - char * buffer = new char[header.length]; - - QFile file((*i)->fileDir + QDir::separator() + (*i)->fileName); - file.open(QIODevice::ReadOnly); - QDataStream in(&file); - if (in.skipRawData((*i)->transfered) == -1) { - qDebug() << "filename" << file.fileName(); - qDebug() << "skip error"; - qDebug() << "file size" << (*i)->size; - qDebug() << "transfered bytes" << (*i)->transfered; - } -// in.device()->seek((*i)->transfered); - if (in.readRawData(buffer, header.length) == -1) { - qDebug() << "Read error"; - qDebug() << "buffer size" << header.length; - qDebug() << "file size" << (*i)->size; - qDebug() << "transfered bytes" << (*i)->transfered; - } - - out.writeRawData((const char *)(buffer), header.length); - (*i)->transfered += header.length; - delete[] buffer; - - if ((*i)->transfered >= (*i)->size) - (*i)->status = TS_CLOSING; - - if ((*i)->transfered > (*i)->size) { - qDebug() << "overread?"; - qDebug() << "file size" << (*i)->size; - qDebug() << "transfered bytes" << (*i)->transfered; - } - - emit uploadUpdated(*i); - } - break; - case TS_CLOSING: - header.type = HT_CLOSE; - header.id = m_UploadDict[*i]; - header.length = 0; + TransferHeader header; + switch (transfer->status) { + case TS_ANNOUNCING: + { + qDebug() << "announcing" << transfer->id; + if (transfer->transfered) + transfer->transfered = 0; + + qint64 oldPosition = block.size(); + + //fake write filename + out << transfer->fileName; + out.device()->seek(oldPosition); + + // ANNOUNCE header with size and filename + header.type = HT_ANNOUNCE; + header.id = transfer->id; + header.length = quint32(block.size() - oldPosition) + sizeof(qint64); out.writeRawData((const char *)(&header), sizeof(TransferHeader)); - (*i)->status = TS_FINISHED; + qDebug() << "sending size" << transfer->size; + out << transfer->size; - m_UploadDict.remove(*i); - break; - default: - break; + qDebug() << "sending filename" << transfer->fileName; + out << transfer->fileName; + + transfer->status = TS_WAITING; } - if (!block.isEmpty()) - m_Socket->write(block); + break; + case TS_TRANSFERING: + { + header.type = HT_DATA; + header.id = transfer->id; + if (transfer->transfered + DATA_BLOCK_SIZE < transfer->size) + header.length = DATA_BLOCK_SIZE; + else + header.length = transfer->size - transfer->transfered; + + out.writeRawData((const char *)(&header), sizeof(TransferHeader)); + + char * buffer = new char[header.length]; + + QFile file(transfer->fileDir + QDir::separator() + transfer->fileName); + file.open(QIODevice::ReadOnly); + QDataStream in(&file); + if (in.skipRawData(transfer->transfered) == -1) { + qDebug() << "filename" << file.fileName(); + qDebug() << "skip error"; + qDebug() << "file size" << transfer->size; + qDebug() << "transfered bytes" << transfer->transfered; + } + if (in.readRawData(buffer, header.length) == -1) { + qDebug() << "Read error"; + qDebug() << "buffer size" << header.length; + qDebug() << "file size" << transfer->size; + qDebug() << "transfered bytes" << transfer->transfered; + } + + out.writeRawData((const char *)(buffer), header.length); + transfer->transfered += header.length; + delete[] buffer; + + if (transfer->transfered >= transfer->size) + transfer->status = TS_CLOSING; + + if (transfer->transfered > transfer->size) { + qDebug() << "overread?"; + qDebug() << "file size" << transfer->size; + qDebug() << "transfered bytes" << transfer->transfered; + } + + emit uploadUpdated(transfer); + } + break; + case TS_CLOSING: + header.type = HT_CLOSE; + header.id = transfer->id; + header.length = 0; + + out.writeRawData((const char *)(&header), sizeof(TransferHeader)); + + transfer->status = TS_FINISHED; + + m_Uploads.dequeue(); + emit uploadUpdated(transfer); + break; + default: + break; } + + if (!block.isEmpty()) + m_Socket->write(block); } void CWorker::readHeader(QDataStream & in) { @@ -292,7 +279,7 @@ namespace qftrans { target = m_DownloadDict[m_LastHeader.id]; } //id not in dict - else if (m_LastHeader.type == HT_ID) { + else if (m_LastHeader.type == HT_ANNOUNCE) { //new file announced qDebug() << "download announced"; TransferData * newFile = new TransferData; @@ -302,31 +289,27 @@ namespace qftrans { newFile->localFile = false; newFile->status = TS_WAITING; newFile->fileDir = m_DestinationDir; + target = newFile; } //check valid target and data available if (target && m_LastHeader.length) { switch (m_LastHeader.type) { - case HT_ID: - target->status = TS_WAITING; -// QFile::remove(target->fileDir + QDir::separator() + target->fileName); - emit downloadUpdated(target); - break; - case HT_SIZE: + case HT_ANNOUNCE: qDebug() << "reading size.."; in >> target->size; qDebug() << "got size:" << target->size; - break; - case HT_NAME: qDebug() << "reading filename.."; in >> target->fileName; qDebug() << "got filename:" << target->fileName; + emit downloadUpdated(target); qDebug() << "trying to send HT_ACK"; //TODO better do this async sendAcknowledge(m_LastHeader.id); break; - case HT_DATA: { + case HT_DATA: + { // qDebug() << "got data" << target->transfered << '/' << target->size; QFile file(target->fileDir + QDir::separator() + target->fileName); if (target->status == TS_WAITING) { diff --git a/cworker.h b/cworker.h index 6557462..fb74226 100644 --- a/cworker.h +++ b/cworker.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "common.h" namespace qftrans { @@ -45,7 +46,7 @@ namespace qftrans { public slots: void addUpload(TransferData * transfer); void removeUpload(TransferData * transfer); - void startUpload(TransferData * transfer); +// void startUpload(TransferData * transfer); void removeDownload(TransferData * data); @@ -66,7 +67,7 @@ namespace qftrans { QString m_DestinationDir; quint32 m_IdCounter; - QTransferIdHash m_UploadDict; + QQueue m_Uploads; QTransferDataHash m_DownloadDict; diff --git a/qtransferlistmodel.cpp b/qtransferlistmodel.cpp index 4db6e50..2edcb68 100644 --- a/qtransferlistmodel.cpp +++ b/qtransferlistmodel.cpp @@ -45,12 +45,18 @@ namespace qftrans { QString status; switch (data->status) { - case TS_WAITING: - status = tr("waiting"); + case TS_SUSPENDED: + status = tr("suspended"); + break; + case TS_PENDING: + status = tr("pending"); break; case TS_ANNOUNCING: status = tr("announcing to peer"); break; + case TS_WAITING: + status = tr("waiting"); + break; case TS_TRANSFERING: status = data->localFile ? tr("Sending") : tr("Recieving"); break;