add queue and "serialize" transfer

This commit is contained in:
Oliver Groß 2009-04-23 10:15:30 +02:00
parent bc21dae968
commit 005f20bfa9
6 changed files with 187 additions and 180 deletions

View File

@ -28,8 +28,8 @@
#define DESCRIPTOR_STRING "QFTH"
namespace qftrans {
enum TransferStatus {TS_WAITING = 0, TS_ANNOUNCING = 1, TS_TRANSFERING = 2, TS_CLOSING = 3, TS_FINISHED = 4};
enum HeaderType {HT_ID = 0, HT_SIZE = 1, HT_NAME = 2, HT_DATA = 3, HT_CLOSE = 4, HT_ACK = 5, HT_CANCEL = 6};
enum TransferStatus {TS_SUSPENDED = 0, TS_PENDING = 1, TS_ANNOUNCING = 2, TS_WAITING = 3, TS_TRANSFERING = 4, TS_CLOSING = 5, TS_FINISHED = 6};
enum HeaderType {HT_ANNOUNCE = 0, HT_DATA = 1, HT_CLOSE = 2, HT_ACK = 3, HT_CANCEL = 4};
struct TransferData {
QString fileDir;
@ -39,6 +39,7 @@ namespace qftrans {
qint64 size;
qint64 transfered;
HeaderType lastHeader;
quint32 id;
};
struct TransferHeader {

View File

@ -45,8 +45,8 @@ namespace qftrans {
m_ActAsServer(false),
m_Connected(false),
m_DestinationDir(QDir::homePath()),
m_TransferThread(NULL),
m_SelectedTransfer(NULL)
m_TransferThread(NULL)
// m_SelectedTransfer(NULL)
{
setupUi();
}
@ -71,8 +71,8 @@ namespace qftrans {
newUpload->fileName = fileInfo.fileName();
newUpload->size = fileInfo.size();
newUpload->transfered = 0;
newUpload->status = TS_WAITING;
newUpload->lastHeader = HT_ID;
newUpload->status = TS_SUSPENDED;
newUpload->lastHeader = HT_ANNOUNCE;
newUpload->localFile = true;
addTransfer(newUpload);
@ -130,15 +130,28 @@ namespace qftrans {
}
void CTransferManager::restartSelected() {
emit uploadAddRequested(m_SelectedTransfer);
emit uploadRestartRequested(m_SelectedTransfer);
QModelIndexList selectedIndexes = m_TransferView->selectionModel()->selectedIndexes();
foreach (QModelIndex i, selectedIndexes) {
emit uploadAddRequested(static_cast<TransferData *>(i.internalPointer()));
}
// emit uploadAddRequested(m_SelectedTransfer);
}
void CTransferManager::cancelSelected() {
if (m_SelectedTransfer->localFile)
QModelIndexList selectedIndexes = m_TransferView->selectionModel()->selectedIndexes();
foreach (QModelIndex i, selectedIndexes) {
if (static_cast<TransferData *>(i.internalPointer())->localFile)
emit uploadRemoveRequested(static_cast<TransferData *>(i.internalPointer()));
else
emit downloadRemoveRequested(static_cast<TransferData *>(i.internalPointer()));
}
/* if (m_SelectedTransfer->localFile)
emit uploadRemoveRequested(m_SelectedTransfer);
else
emit downloadRemoveRequested(m_SelectedTransfer);
emit downloadRemoveRequested(m_SelectedTransfer);*/
}
inline void CTransferManager::setupUi() {
@ -181,6 +194,7 @@ namespace qftrans {
currentMenu->addAction(tr("About &Qt"), qApp, SLOT(aboutQt()));
m_TransferView = new QListView(this);
m_TransferView->setSelectionMode(QAbstractItemView::ExtendedSelection);
setCentralWidget(m_TransferView);
QTransferListModel * listModel = new QTransferListModel(&m_Data);
@ -196,8 +210,7 @@ namespace qftrans {
updateConnectionState(false);
m_ConnectionDialog = new CConnectionDialog(this);
}
}
void CTransferManager::updateConnectionState(bool connected) {
m_Connected = connected;
@ -231,7 +244,7 @@ namespace qftrans {
connect(worker, SIGNAL(uploadUpdated(TransferData *)), model, SLOT(updateEntry(TransferData *)));
connect(this, SIGNAL(uploadAddRequested(TransferData *)), worker, SLOT(addUpload(TransferData *)));
connect(this, SIGNAL(uploadRestartRequested(TransferData *)), worker, SLOT(startUpload(TransferData *)));
// connect(this, SIGNAL(uploadRestartRequested(TransferData *)), worker, SLOT(startUpload(TransferData *)));
connect(this, SIGNAL(uploadRemoveRequested(TransferData *)), worker, SLOT(removeUpload(TransferData *)));
connect(this, SIGNAL(downloadRemoveRequested(TransferData *)), worker, SLOT(removeDownload(TransferData *)));
@ -239,13 +252,16 @@ namespace qftrans {
void CTransferManager::handleSelectionChanged(const QItemSelection & selected) {
QModelIndexList selectedIndexes = selected.indexes();
if (selectedIndexes.isEmpty())
m_SelectedTransfer = NULL;
else
m_SelectedTransfer = static_cast<TransferData *>(selectedIndexes[0].internalPointer());
// if (selectedIndexes.isEmpty())
// m_SelectedTransfer = NULL;
// else
// m_SelectedTransfer = static_cast<TransferData *>(selectedIndexes[0].internalPointer());
//
m_TransferRestartAction->setDisabled(selectedIndexes.isEmpty());
m_TransferCancelAction->setDisabled(selectedIndexes.isEmpty());
m_TransferRestartAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->localFile);
m_TransferCancelAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->status != TS_FINISHED);
/* m_TransferRestartAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->localFile);
m_TransferCancelAction->setEnabled(m_SelectedTransfer && m_SelectedTransfer->status != TS_FINISHED);*/
}
void CTransferManager::addTransfer(TransferData * transfer) {

View File

@ -85,7 +85,7 @@ namespace qftrans {
QListView * m_TransferView;
TransferData * m_SelectedTransfer;
// TransferData * m_SelectedTransfer;
void setupUi();

View File

@ -38,44 +38,59 @@ namespace qftrans {
//TODO addUpload
void CWorker::addUpload(TransferData * transfer) {
if (!m_UploadDict.contains(transfer)) {
transfer->status = TS_PENDING;
if (!m_Uploads.contains(transfer)) {
qDebug() << "adding upload";
transfer->status = TS_WAITING;
m_UploadDict.insert(transfer, m_IdCounter);
transfer->id = m_IdCounter;
m_IdCounter++;
m_Uploads.enqueue(transfer);
}
else
qDebug() << "(re)adding upload";
if (m_Socket->bytesToWrite() == 0) {
qDebug() << "executing upload";
writeData();
}
}
//TODO removeUpload
void CWorker::removeUpload(TransferData * transfer) {
if (m_UploadDict.contains(transfer) && transfer->status != TS_WAITING) {
if (m_Uploads.contains(transfer) && transfer->status > TS_PENDING) {
transfer->status = TS_CLOSING;
if (m_Socket->bytesToWrite() == 0) {
qDebug() << "executing upload";
writeData();
}
}
}
//TODO startUpload
void CWorker::startUpload(TransferData * transfer) {
if (m_UploadDict.contains(transfer) && transfer->status == TS_WAITING) {
qDebug() << "scheduling upload for announce";
transfer->status = TS_ANNOUNCING;
if (m_Socket->bytesToWrite() == 0) {
qDebug() << "executing upload";
qDebug() << "executing closedown";
writeData();
}
}
}
//TODO ? startUpload
// void CWorker::startUpload(TransferData * transfer) {
// if (m_Uploads.contains(transfer) && transfer->status != TS_PENDING)
// transfer->status = TS_PENDING;
//
// if (m_Socket->bytesToWrite() == 0) {
// qDebug() << "executing upload";
// writeData();
// }
// }
void CWorker::startUploadData(quint32 id) {
TransferData * transfer = m_UploadDict.key(id);
if (transfer) {
qDebug() << "scheduling upload for transfer";
transfer->status = TS_TRANSFERING;
if (m_Socket->bytesToWrite() == 0) {
qDebug() << "executing upload";
writeData();
}
if (m_Uploads.isEmpty())
return;
TransferData * transfer = m_Uploads.head(); /*m_UploadDict.key(id);*/
if (transfer->id != id) {
qDebug() << "got invalid id:" << id << "... expected:" << transfer->id;
return;
}
qDebug() << "scheduling upload for transfer";
transfer->status = TS_TRANSFERING;
if (m_Socket->bytesToWrite() == 0) {
qDebug() << "executing upload";
writeData();
}
}
@ -85,7 +100,6 @@ namespace qftrans {
out.setVersion(QDataStream::Qt_4_0);
TransferHeader header;
// header.descriptor = DESCRIPTOR_STRING;
header.type = HT_ACK;
header.id = id;
@ -109,141 +123,114 @@ namespace qftrans {
}
void CWorker::writeData() {
if (m_UploadDict.isEmpty()) {
if (m_Uploads.isEmpty()) {
// if (m_UploadDict.isEmpty()) {
qDebug() << "no more uploads.";
return;
}
TransferData * transfer = m_Uploads.head();
if (transfer->status == TS_PENDING)
transfer->status = TS_ANNOUNCING;
QByteArray block;
QDataStream out(&block, QIODevice::ReadWrite);
out.setVersion(QDataStream::Qt_4_0);
QTransferDataList transfers = m_UploadDict.keys();
QTransferDataList::iterator i;
for (i = transfers.begin(); i != transfers.end(); ++i) {
block.clear();
TransferHeader header;
switch ((*i)->status) {
case TS_ANNOUNCING: {
qDebug() << "announcing" << m_UploadDict[*i];
if ((*i)->transfered)
(*i)->transfered = 0;
// ID header with no data
header.type = HT_ID;
header.id = m_UploadDict[*i];
header.length = 0;
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
//size header with size
header.type = HT_SIZE;
header.length = sizeof(qint64);
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
qDebug() << "sending size" << (*i)->size;
out << (*i)->size;
//name header with file name
qint64 oldPosition = block.size();
//fake write filename
out << (*i)->fileName;
out.device()->seek(oldPosition);
header.type = HT_NAME;
header.length = quint32(block.size() - oldPosition);
// qDebug() << "pos" << out.device()->pos() << block.size();
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
// qDebug() << "pos post header" << out.device()->pos() << block.size();
// qint64 oldPosition = block.size() - sizeof(quint32);
qDebug() << "sending filename" << (*i)->fileName;
out << (*i)->fileName;
// qDebug() << "pos post filename" << out.device()->pos() << block.size();
// out.device()->seek(oldPosition);
// qDebug() << "pos post seek" << out.device()->pos();
/* qDebug() << "filename data length" << header.length;
out << header.length;
out.device()->seek(oldPosition);
out >> header.length;
qDebug() << "written length" << header.length;
out.device()->seek(block.size());
qDebug() << "pos post length correct" << out.device()->pos() << block.size();*/
(*i)->status = TS_WAITING;
}
break;
case TS_TRANSFERING: {
// qDebug() << "transfering" << m_DataDict[*i];
header.type = HT_DATA;
header.id = m_UploadDict[*i];
if ((*i)->transfered + DATA_BLOCK_SIZE < (*i)->size)
header.length = DATA_BLOCK_SIZE;
else
header.length =(*i)->size - (*i)->transfered;
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
char * buffer = new char[header.length];
QFile file((*i)->fileDir + QDir::separator() + (*i)->fileName);
file.open(QIODevice::ReadOnly);
QDataStream in(&file);
if (in.skipRawData((*i)->transfered) == -1) {
qDebug() << "filename" << file.fileName();
qDebug() << "skip error";
qDebug() << "file size" << (*i)->size;
qDebug() << "transfered bytes" << (*i)->transfered;
}
// in.device()->seek((*i)->transfered);
if (in.readRawData(buffer, header.length) == -1) {
qDebug() << "Read error";
qDebug() << "buffer size" << header.length;
qDebug() << "file size" << (*i)->size;
qDebug() << "transfered bytes" << (*i)->transfered;
}
out.writeRawData((const char *)(buffer), header.length);
(*i)->transfered += header.length;
delete[] buffer;
if ((*i)->transfered >= (*i)->size)
(*i)->status = TS_CLOSING;
if ((*i)->transfered > (*i)->size) {
qDebug() << "overread?";
qDebug() << "file size" << (*i)->size;
qDebug() << "transfered bytes" << (*i)->transfered;
}
emit uploadUpdated(*i);
}
break;
case TS_CLOSING:
header.type = HT_CLOSE;
header.id = m_UploadDict[*i];
header.length = 0;
TransferHeader header;
switch (transfer->status) {
case TS_ANNOUNCING:
{
qDebug() << "announcing" << transfer->id;
if (transfer->transfered)
transfer->transfered = 0;
qint64 oldPosition = block.size();
//fake write filename
out << transfer->fileName;
out.device()->seek(oldPosition);
// ANNOUNCE header with size and filename
header.type = HT_ANNOUNCE;
header.id = transfer->id;
header.length = quint32(block.size() - oldPosition) + sizeof(qint64);
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
(*i)->status = TS_FINISHED;
qDebug() << "sending size" << transfer->size;
out << transfer->size;
m_UploadDict.remove(*i);
break;
default:
break;
qDebug() << "sending filename" << transfer->fileName;
out << transfer->fileName;
transfer->status = TS_WAITING;
}
if (!block.isEmpty())
m_Socket->write(block);
break;
case TS_TRANSFERING:
{
header.type = HT_DATA;
header.id = transfer->id;
if (transfer->transfered + DATA_BLOCK_SIZE < transfer->size)
header.length = DATA_BLOCK_SIZE;
else
header.length = transfer->size - transfer->transfered;
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
char * buffer = new char[header.length];
QFile file(transfer->fileDir + QDir::separator() + transfer->fileName);
file.open(QIODevice::ReadOnly);
QDataStream in(&file);
if (in.skipRawData(transfer->transfered) == -1) {
qDebug() << "filename" << file.fileName();
qDebug() << "skip error";
qDebug() << "file size" << transfer->size;
qDebug() << "transfered bytes" << transfer->transfered;
}
if (in.readRawData(buffer, header.length) == -1) {
qDebug() << "Read error";
qDebug() << "buffer size" << header.length;
qDebug() << "file size" << transfer->size;
qDebug() << "transfered bytes" << transfer->transfered;
}
out.writeRawData((const char *)(buffer), header.length);
transfer->transfered += header.length;
delete[] buffer;
if (transfer->transfered >= transfer->size)
transfer->status = TS_CLOSING;
if (transfer->transfered > transfer->size) {
qDebug() << "overread?";
qDebug() << "file size" << transfer->size;
qDebug() << "transfered bytes" << transfer->transfered;
}
emit uploadUpdated(transfer);
}
break;
case TS_CLOSING:
header.type = HT_CLOSE;
header.id = transfer->id;
header.length = 0;
out.writeRawData((const char *)(&header), sizeof(TransferHeader));
transfer->status = TS_FINISHED;
m_Uploads.dequeue();
emit uploadUpdated(transfer);
break;
default:
break;
}
if (!block.isEmpty())
m_Socket->write(block);
}
void CWorker::readHeader(QDataStream & in) {
@ -292,7 +279,7 @@ namespace qftrans {
target = m_DownloadDict[m_LastHeader.id];
}
//id not in dict
else if (m_LastHeader.type == HT_ID) {
else if (m_LastHeader.type == HT_ANNOUNCE) {
//new file announced
qDebug() << "download announced";
TransferData * newFile = new TransferData;
@ -302,31 +289,27 @@ namespace qftrans {
newFile->localFile = false;
newFile->status = TS_WAITING;
newFile->fileDir = m_DestinationDir;
target = newFile;
}
//check valid target and data available
if (target && m_LastHeader.length) {
switch (m_LastHeader.type) {
case HT_ID:
target->status = TS_WAITING;
// QFile::remove(target->fileDir + QDir::separator() + target->fileName);
emit downloadUpdated(target);
break;
case HT_SIZE:
case HT_ANNOUNCE:
qDebug() << "reading size..";
in >> target->size;
qDebug() << "got size:" << target->size;
break;
case HT_NAME:
qDebug() << "reading filename..";
in >> target->fileName;
qDebug() << "got filename:" << target->fileName;
emit downloadUpdated(target);
qDebug() << "trying to send HT_ACK";
//TODO better do this async
sendAcknowledge(m_LastHeader.id);
break;
case HT_DATA: {
case HT_DATA:
{
// qDebug() << "got data" << target->transfered << '/' << target->size;
QFile file(target->fileDir + QDir::separator() + target->fileName);
if (target->status == TS_WAITING) {

View File

@ -23,6 +23,7 @@
#include <QObject>
#include <QTcpSocket>
#include <QDataStream>
#include <QQueue>
#include "common.h"
namespace qftrans {
@ -45,7 +46,7 @@ namespace qftrans {
public slots:
void addUpload(TransferData * transfer);
void removeUpload(TransferData * transfer);
void startUpload(TransferData * transfer);
// void startUpload(TransferData * transfer);
void removeDownload(TransferData * data);
@ -66,7 +67,7 @@ namespace qftrans {
QString m_DestinationDir;
quint32 m_IdCounter;
QTransferIdHash m_UploadDict;
QQueue<TransferData *> m_Uploads;
QTransferDataHash m_DownloadDict;

View File

@ -45,12 +45,18 @@ namespace qftrans {
QString status;
switch (data->status) {
case TS_WAITING:
status = tr("waiting");
case TS_SUSPENDED:
status = tr("suspended");
break;
case TS_PENDING:
status = tr("pending");
break;
case TS_ANNOUNCING:
status = tr("announcing to peer");
break;
case TS_WAITING:
status = tr("waiting");
break;
case TS_TRANSFERING:
status = data->localFile ? tr("Sending") : tr("Recieving");
break;