You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
408 lines
12 KiB
408 lines
12 KiB
/*************************************************************************** |
|
* Copyright (C) 2008 by Oliver Groß * |
|
* z.o.gross@gmx.de * |
|
* * |
|
* This program is free software; you can redistribute it and/or modify * |
|
* it under the terms of the GNU General Public License as published by * |
|
* the Free Software Foundation; either version 2 of the License, or * |
|
* (at your option) any later version. * |
|
* * |
|
* This program is distributed in the hope that it will be useful, * |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of * |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * |
|
* GNU General Public License for more details. * |
|
* * |
|
* You should have received a copy of the GNU General Public License * |
|
* along with this program; if not, write to the * |
|
* Free Software Foundation, Inc., * |
|
* 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * |
|
***************************************************************************/ |
|
#include "cworker.h" |
|
#include <QTcpSocket> |
|
#include <QByteArray> |
|
#include <QFile> |
|
#include <QDir> |
|
#include <QThread> |
|
|
|
namespace qftrans { |
|
CWorker::CWorker(QTcpSocket * socket, QObject * parent) : QObject(parent), m_Socket(socket), m_IdCounter(0) { |
|
connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); |
|
connect(m_Socket, SIGNAL(bytesWritten(qint64)), this, SLOT(writeData()), Qt::QueuedConnection); |
|
connect(m_Socket, SIGNAL(readyRead()), this, SLOT(readData()), Qt::QueuedConnection); |
|
qDebug() << "worker created"; |
|
} |
|
|
|
CWorker::~CWorker() { |
|
qDebug() << "worker removed"; |
|
} |
|
|
|
//TODO addUpload |
|
void CWorker::addUpload(TransferData * transfer) { |
|
transfer->status = TS_PENDING; |
|
|
|
if (!m_Uploads.contains(transfer)) { |
|
qDebug() << "adding upload"; |
|
transfer->id = m_IdCounter; |
|
m_IdCounter++; |
|
m_Uploads.enqueue(transfer); |
|
} |
|
else |
|
qDebug() << "(re)adding upload"; |
|
|
|
if (m_Socket->bytesToWrite() == 0) { |
|
qDebug() << "executing upload"; |
|
writeData(); |
|
} |
|
} |
|
//TODO removeUpload |
|
void CWorker::removeUpload(TransferData * transfer) { |
|
if (m_Uploads.contains(transfer) && transfer->status > TS_PENDING) { |
|
transfer->status = TS_CLOSING; |
|
if (m_Socket->bytesToWrite() == 0) { |
|
qDebug() << "executing closedown"; |
|
writeData(); |
|
} |
|
} |
|
} |
|
//TODO ? startUpload |
|
// void CWorker::startUpload(TransferData * transfer) { |
|
// if (m_Uploads.contains(transfer) && transfer->status != TS_PENDING) |
|
// transfer->status = TS_PENDING; |
|
// |
|
// if (m_Socket->bytesToWrite() == 0) { |
|
// qDebug() << "executing upload"; |
|
// writeData(); |
|
// } |
|
// } |
|
|
|
void CWorker::startUploadData(quint32 id) { |
|
if (m_Uploads.isEmpty()) |
|
return; |
|
|
|
TransferData * transfer = m_Uploads.head(); /*m_UploadDict.key(id);*/ |
|
|
|
if (transfer->id != id) { |
|
qDebug() << "got invalid id:" << id << "... expected:" << transfer->id; |
|
return; |
|
} |
|
|
|
qDebug() << "scheduling upload for transfer"; |
|
transfer->status = TS_TRANSFERING; |
|
if (m_Socket->bytesToWrite() == 0) { |
|
qDebug() << "executing upload"; |
|
writeData(); |
|
} |
|
} |
|
|
|
void CWorker::sendAcknowledge(quint32 id) { |
|
QByteArray block; |
|
QDataStream out(&block, QIODevice::WriteOnly); |
|
out.setVersion(QDataStream::Qt_4_0); |
|
|
|
TransferHeader header; |
|
|
|
header.type = HT_ACK; |
|
header.id = id; |
|
header.length = 0; |
|
|
|
out.writeRawData((const char *)(&header), sizeof(TransferHeader)); |
|
|
|
qDebug() << "sending HT_ACK" << id; |
|
|
|
m_Socket->write(block); |
|
qDebug() << "sent HT_ACK" << id; |
|
} |
|
|
|
//TODO CTransferThread::removeDownload |
|
void CWorker::removeDownload(TransferData * /*transfer*/) { |
|
|
|
} |
|
|
|
void CWorker::setDestinationDir(QString & value) { |
|
m_DestinationDir = value; |
|
} |
|
|
|
void CWorker::writeData() { |
|
if (m_Uploads.isEmpty()) { |
|
// if (m_UploadDict.isEmpty()) { |
|
qDebug() << "no more uploads."; |
|
return; |
|
} |
|
|
|
TransferData * transfer = m_Uploads.head(); |
|
|
|
if (transfer->status == TS_PENDING) |
|
transfer->status = TS_ANNOUNCING; |
|
|
|
QByteArray block; |
|
QDataStream out(&block, QIODevice::ReadWrite); |
|
out.setVersion(QDataStream::Qt_4_0); |
|
|
|
TransferHeader header; |
|
switch (transfer->status) { |
|
case TS_ANNOUNCING: |
|
{ |
|
qDebug() << "announcing" << transfer->id; |
|
if (transfer->transfered) |
|
transfer->transfered = 0; |
|
|
|
qint64 oldPosition = block.size(); |
|
|
|
//fake write filename |
|
out << transfer->fileName; |
|
out.device()->seek(oldPosition); |
|
|
|
// ANNOUNCE header with size and filename |
|
header.type = HT_ANNOUNCE; |
|
header.id = transfer->id; |
|
header.length = quint32(block.size() - oldPosition) + sizeof(qint64); |
|
|
|
out.writeRawData((const char *)(&header), sizeof(TransferHeader)); |
|
|
|
qDebug() << "sending size" << transfer->size; |
|
out << transfer->size; |
|
|
|
qDebug() << "sending filename" << transfer->fileName; |
|
out << transfer->fileName; |
|
|
|
transfer->status = TS_WAITING; |
|
} |
|
break; |
|
case TS_TRANSFERING: |
|
{ |
|
header.type = HT_DATA; |
|
header.id = transfer->id; |
|
if (transfer->transfered + DATA_BLOCK_SIZE < transfer->size) |
|
header.length = DATA_BLOCK_SIZE; |
|
else |
|
header.length = transfer->size - transfer->transfered; |
|
|
|
out.writeRawData((const char *)(&header), sizeof(TransferHeader)); |
|
|
|
char * buffer = new char[header.length]; |
|
|
|
QFile file(transfer->fileDir + QDir::separator() + transfer->fileName); |
|
file.open(QIODevice::ReadOnly); |
|
QDataStream in(&file); |
|
if (in.skipRawData(transfer->transfered) == -1) { |
|
qDebug() << "filename" << file.fileName(); |
|
qDebug() << "skip error"; |
|
qDebug() << "file size" << transfer->size; |
|
qDebug() << "transfered bytes" << transfer->transfered; |
|
} |
|
if (in.readRawData(buffer, header.length) == -1) { |
|
qDebug() << "Read error"; |
|
qDebug() << "buffer size" << header.length; |
|
qDebug() << "file size" << transfer->size; |
|
qDebug() << "transfered bytes" << transfer->transfered; |
|
} |
|
|
|
out.writeRawData((const char *)(buffer), header.length); |
|
transfer->transfered += header.length; |
|
delete[] buffer; |
|
|
|
if (transfer->transfered >= transfer->size) |
|
transfer->status = TS_CLOSING; |
|
|
|
if (transfer->transfered > transfer->size) { |
|
qDebug() << "overread?"; |
|
qDebug() << "file size" << transfer->size; |
|
qDebug() << "transfered bytes" << transfer->transfered; |
|
} |
|
|
|
emit uploadUpdated(transfer); |
|
} |
|
break; |
|
case TS_CLOSING: |
|
header.type = HT_CLOSE; |
|
header.id = transfer->id; |
|
header.length = 0; |
|
|
|
out.writeRawData((const char *)(&header), sizeof(TransferHeader)); |
|
|
|
transfer->status = TS_FINISHED; |
|
|
|
m_Uploads.dequeue(); |
|
emit uploadUpdated(transfer); |
|
break; |
|
default: |
|
break; |
|
} |
|
|
|
if (!block.isEmpty()) |
|
m_Socket->write(block); |
|
} |
|
|
|
void CWorker::readHeader(QDataStream & in) { |
|
if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) { |
|
in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader)); |
|
m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0); |
|
|
|
if (!m_LastHeaderValid) { |
|
qDebug() << "got invalid header data"; |
|
qDebug() << "available Bytes" << m_Socket->bytesAvailable(); |
|
} |
|
} |
|
else |
|
m_LastHeaderValid = false; |
|
} |
|
|
|
void CWorker::readData() { |
|
QDataStream in(m_Socket); |
|
in.setVersion(QDataStream::Qt_4_0); |
|
|
|
if (!m_LastHeaderValid) |
|
readHeader(in); |
|
|
|
TransferData * target = NULL; |
|
|
|
while (m_LastHeaderValid && m_Socket->bytesAvailable() >= m_LastHeader.length) { |
|
// qDebug() << "available Bytes" << m_Socket->bytesAvailable() << "length" << m_LastHeader.length; |
|
|
|
//look for existing id |
|
if (m_LastHeader.type == HT_ACK) { |
|
qDebug() << "got HT_ACK .. scheduling upload to start"; |
|
startUploadData(m_LastHeader.id); |
|
} |
|
else { |
|
if (m_DownloadDict.contains(m_LastHeader.id)) { |
|
if (m_LastHeader.type == HT_CLOSE) { |
|
//finished or canceled by peer |
|
|
|
qDebug() << "download closed py peer"; |
|
TransferData * closed = m_DownloadDict.take(m_LastHeader.id); |
|
closed->status = TS_FINISHED; |
|
emit downloadRemoved(closed); |
|
} |
|
else |
|
//assign target |
|
target = m_DownloadDict[m_LastHeader.id]; |
|
} |
|
//id not in dict |
|
else if (m_LastHeader.type == HT_ANNOUNCE) { |
|
//new file announced |
|
qDebug() << "download announced"; |
|
TransferData * newFile = new TransferData; |
|
m_DownloadDict.insert(m_LastHeader.id, newFile); |
|
newFile->transfered = 0; |
|
newFile->size = 0; |
|
newFile->localFile = false; |
|
newFile->status = TS_WAITING; |
|
newFile->fileDir = m_DestinationDir; |
|
target = newFile; |
|
} |
|
|
|
//check valid target and data available |
|
if (target && m_LastHeader.length) { |
|
switch (m_LastHeader.type) { |
|
case HT_ANNOUNCE: |
|
qDebug() << "reading size.."; |
|
in >> target->size; |
|
qDebug() << "got size:" << target->size; |
|
qDebug() << "reading filename.."; |
|
in >> target->fileName; |
|
qDebug() << "got filename:" << target->fileName; |
|
emit downloadUpdated(target); |
|
qDebug() << "trying to send HT_ACK"; |
|
|
|
//TODO better do this async |
|
sendAcknowledge(m_LastHeader.id); |
|
break; |
|
case HT_DATA: |
|
{ |
|
// qDebug() << "got data" << target->transfered << '/' << target->size; |
|
QFile file(target->fileDir + QDir::separator() + target->fileName); |
|
if (target->status == TS_WAITING) { |
|
target->status = TS_TRANSFERING; |
|
emit downloadAdded(target); |
|
file.open(QIODevice::WriteOnly | QIODevice::Truncate); |
|
} |
|
else if (target->status == TS_TRANSFERING) |
|
file.open(QIODevice::WriteOnly | QIODevice::Append); |
|
else { |
|
in.skipRawData(m_LastHeader.length); |
|
qDebug() << "transfer not ready to recieve data. status is" << target->status; |
|
qDebug() << "skipping" << m_LastHeader.length << "bytes of data"; |
|
break; |
|
} |
|
|
|
char * buffer = new char[m_LastHeader.length]; |
|
if (in.readRawData(buffer, m_LastHeader.length) == -1) { |
|
qDebug() << "error reading from socket"; |
|
qDebug() << "length" << m_LastHeader.length; |
|
qDebug() << "available Bytes" << m_Socket->bytesAvailable(); |
|
} |
|
|
|
QDataStream out(&file); |
|
out.writeRawData((const char *)(buffer), m_LastHeader.length); |
|
target->transfered += m_LastHeader.length; |
|
delete[] buffer; |
|
|
|
emit downloadUpdated(target); |
|
} |
|
break; |
|
default: |
|
in.skipRawData(m_LastHeader.length); |
|
qDebug() << "unknown header"; |
|
qDebug() << "skipping" << m_LastHeader.length << "bytes of data"; |
|
break; |
|
} |
|
} |
|
} |
|
|
|
readHeader(in); |
|
} |
|
} |
|
|
|
void CWorker::writePing() { |
|
qDebug() << "write ping... thread id" << QThread::currentThreadId(); |
|
if (m_IdCounter > 10) |
|
return; |
|
|
|
QByteArray block; |
|
QDataStream out(&block, QIODevice::WriteOnly); |
|
out.setVersion(QDataStream::Qt_4_0); |
|
|
|
TransferHeader header; |
|
|
|
header.type = HT_ACK; |
|
header.id = m_IdCounter++; |
|
header.length = 0; |
|
|
|
out.writeRawData((const char *)(&header), sizeof(TransferHeader)); |
|
|
|
qDebug() << "sending Ping..." << header.id; |
|
if (m_Socket->write(block) == -1) |
|
qDebug() << "error"; |
|
else |
|
qDebug() << "done"; |
|
} |
|
|
|
void CWorker::readPing() { |
|
qDebug() << "read ping... thread id" << QThread::currentThreadId(); |
|
|
|
QDataStream in(m_Socket); |
|
in.setVersion(QDataStream::Qt_4_0); |
|
|
|
do { |
|
if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) { |
|
in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader)); |
|
m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0); |
|
|
|
if (m_LastHeaderValid) |
|
qDebug() << "got valid ping" << m_LastHeader.id; |
|
else |
|
qDebug() << "got invalid ping" << m_LastHeader.id; |
|
|
|
writePing(); |
|
} |
|
else |
|
m_LastHeaderValid = false; |
|
} |
|
while (m_LastHeaderValid); |
|
} |
|
|
|
void CWorker::error(QAbstractSocket::SocketError) { |
|
qDebug() << m_Socket->errorString(); |
|
} |
|
}
|
|
|