/*************************************************************************** * Copyright (C) 2008 by Oliver Groß * * z.o.gross@gmx.de * * * * This program is free software; you can redistribute it and/or modify * * it under the terms of the GNU General Public License as published by * * the Free Software Foundation; either version 2 of the License, or * * (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the * * Free Software Foundation, Inc., * * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * ***************************************************************************/ #include "cworker.h" #include #include #include #include #include namespace qftrans { CWorker::CWorker(QTcpSocket * socket, QObject * parent) : QObject(parent), m_Socket(socket), m_IdCounter(0) { connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError))); connect(m_Socket, SIGNAL(bytesWritten(qint64)), this, SLOT(writeData()), Qt::QueuedConnection); connect(m_Socket, SIGNAL(readyRead()), this, SLOT(readData()), Qt::QueuedConnection); qDebug() << "worker created"; } CWorker::~CWorker() { qDebug() << "worker removed"; } //TODO addUpload void CWorker::addUpload(TransferData * transfer) { if (!m_UploadDict.contains(transfer)) { qDebug() << "adding upload"; transfer->status = TS_WAITING; m_UploadDict.insert(transfer, m_IdCounter); m_IdCounter++; } } //TODO removeUpload void CWorker::removeUpload(TransferData * transfer) { if (m_UploadDict.contains(transfer) && transfer->status != TS_WAITING) { transfer->status = TS_CLOSING; if (m_Socket->bytesToWrite() == 0) { qDebug() << "executing upload"; writeData(); } } } //TODO startUpload void CWorker::startUpload(TransferData * transfer) { if (m_UploadDict.contains(transfer) && transfer->status == TS_WAITING) { qDebug() << "scheduling upload for announce"; transfer->status = TS_ANNOUNCING; if (m_Socket->bytesToWrite() == 0) { qDebug() << "executing upload"; writeData(); } } } void CWorker::startUploadData(quint32 id) { TransferData * transfer = m_UploadDict.key(id); if (transfer) { qDebug() << "scheduling upload for transfer"; transfer->status = TS_TRANSFERING; if (m_Socket->bytesToWrite() == 0) { qDebug() << "executing upload"; writeData(); } } } void CWorker::sendAcknowledge(quint32 id) { QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_0); TransferHeader header; // header.descriptor = DESCRIPTOR_STRING; header.type = HT_ACK; header.id = id; header.length = 0; out.writeRawData((const char *)(&header), sizeof(TransferHeader)); qDebug() << "sending HT_ACK" << id; m_Socket->write(block); qDebug() << "sent HT_ACK" << id; } //TODO CTransferThread::removeDownload void CWorker::removeDownload(TransferData * /*transfer*/) { } void CWorker::setDestinationDir(QString & value) { m_DestinationDir = value; } void CWorker::writeData() { if (m_UploadDict.isEmpty()) { qDebug() << "no more uploads."; return; } QByteArray block; QDataStream out(&block, QIODevice::ReadWrite); out.setVersion(QDataStream::Qt_4_0); QTransferDataList transfers = m_UploadDict.keys(); QTransferDataList::iterator i; for (i = transfers.begin(); i != transfers.end(); ++i) { block.clear(); TransferHeader header; switch ((*i)->status) { case TS_ANNOUNCING: { qDebug() << "announcing" << m_UploadDict[*i]; if ((*i)->transfered) (*i)->transfered = 0; // ID header with no data header.type = HT_ID; header.id = m_UploadDict[*i]; header.length = 0; out.writeRawData((const char *)(&header), sizeof(TransferHeader)); //size header with size header.type = HT_SIZE; header.length = sizeof(qint64); out.writeRawData((const char *)(&header), sizeof(TransferHeader)); qDebug() << "sending size" << (*i)->size; out << (*i)->size; //name header with file name qint64 oldPosition = block.size(); //fake write filename out << (*i)->fileName; out.device()->seek(oldPosition); header.type = HT_NAME; header.length = quint32(block.size() - oldPosition); // qDebug() << "pos" << out.device()->pos() << block.size(); out.writeRawData((const char *)(&header), sizeof(TransferHeader)); // qDebug() << "pos post header" << out.device()->pos() << block.size(); // qint64 oldPosition = block.size() - sizeof(quint32); qDebug() << "sending filename" << (*i)->fileName; out << (*i)->fileName; // qDebug() << "pos post filename" << out.device()->pos() << block.size(); // out.device()->seek(oldPosition); // qDebug() << "pos post seek" << out.device()->pos(); /* qDebug() << "filename data length" << header.length; out << header.length; out.device()->seek(oldPosition); out >> header.length; qDebug() << "written length" << header.length; out.device()->seek(block.size()); qDebug() << "pos post length correct" << out.device()->pos() << block.size();*/ (*i)->status = TS_WAITING; } break; case TS_TRANSFERING: { // qDebug() << "transfering" << m_DataDict[*i]; header.type = HT_DATA; header.id = m_UploadDict[*i]; if ((*i)->transfered + DATA_BLOCK_SIZE < (*i)->size) header.length = DATA_BLOCK_SIZE; else header.length =(*i)->size - (*i)->transfered; out.writeRawData((const char *)(&header), sizeof(TransferHeader)); char * buffer = new char[header.length]; QFile file((*i)->fileDir + QDir::separator() + (*i)->fileName); file.open(QIODevice::ReadOnly); QDataStream in(&file); if (in.skipRawData((*i)->transfered) == -1) { qDebug() << "filename" << file.fileName(); qDebug() << "skip error"; qDebug() << "file size" << (*i)->size; qDebug() << "transfered bytes" << (*i)->transfered; } // in.device()->seek((*i)->transfered); if (in.readRawData(buffer, header.length) == -1) { qDebug() << "Read error"; qDebug() << "buffer size" << header.length; qDebug() << "file size" << (*i)->size; qDebug() << "transfered bytes" << (*i)->transfered; } out.writeRawData((const char *)(buffer), header.length); (*i)->transfered += header.length; delete[] buffer; if ((*i)->transfered >= (*i)->size) (*i)->status = TS_CLOSING; if ((*i)->transfered > (*i)->size) { qDebug() << "overread?"; qDebug() << "file size" << (*i)->size; qDebug() << "transfered bytes" << (*i)->transfered; } emit uploadUpdated(*i); } break; case TS_CLOSING: header.type = HT_CLOSE; header.id = m_UploadDict[*i]; header.length = 0; out.writeRawData((const char *)(&header), sizeof(TransferHeader)); (*i)->status = TS_FINISHED; m_UploadDict.remove(*i); break; default: break; } if (!block.isEmpty()) m_Socket->write(block); } } void CWorker::readHeader(QDataStream & in) { if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) { in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader)); m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0); if (!m_LastHeaderValid) { qDebug() << "got invalid header data"; qDebug() << "available Bytes" << m_Socket->bytesAvailable(); } } else m_LastHeaderValid = false; } void CWorker::readData() { QDataStream in(m_Socket); in.setVersion(QDataStream::Qt_4_0); if (!m_LastHeaderValid) readHeader(in); TransferData * target = NULL; while (m_LastHeaderValid && m_Socket->bytesAvailable() >= m_LastHeader.length) { // qDebug() << "available Bytes" << m_Socket->bytesAvailable() << "length" << m_LastHeader.length; //look for existing id if (m_LastHeader.type == HT_ACK) { qDebug() << "got HT_ACK .. scheduling upload to start"; startUploadData(m_LastHeader.id); } else { if (m_DownloadDict.contains(m_LastHeader.id)) { if (m_LastHeader.type == HT_CLOSE) { //finished or canceled by peer qDebug() << "download closed py peer"; TransferData * closed = m_DownloadDict.take(m_LastHeader.id); closed->status = TS_FINISHED; emit downloadRemoved(closed); } else //assign target target = m_DownloadDict[m_LastHeader.id]; } //id not in dict else if (m_LastHeader.type == HT_ID) { //new file announced qDebug() << "download announced"; TransferData * newFile = new TransferData; m_DownloadDict.insert(m_LastHeader.id, newFile); newFile->transfered = 0; newFile->size = 0; newFile->localFile = false; newFile->status = TS_WAITING; newFile->fileDir = m_DestinationDir; } //check valid target and data available if (target && m_LastHeader.length) { switch (m_LastHeader.type) { case HT_ID: target->status = TS_WAITING; // QFile::remove(target->fileDir + QDir::separator() + target->fileName); emit downloadUpdated(target); break; case HT_SIZE: qDebug() << "reading size.."; in >> target->size; qDebug() << "got size:" << target->size; break; case HT_NAME: qDebug() << "reading filename.."; in >> target->fileName; qDebug() << "got filename:" << target->fileName; qDebug() << "trying to send HT_ACK"; //TODO better do this async sendAcknowledge(m_LastHeader.id); break; case HT_DATA: { // qDebug() << "got data" << target->transfered << '/' << target->size; QFile file(target->fileDir + QDir::separator() + target->fileName); if (target->status == TS_WAITING) { target->status = TS_TRANSFERING; emit downloadAdded(target); file.open(QIODevice::WriteOnly | QIODevice::Truncate); } else if (target->status == TS_TRANSFERING) file.open(QIODevice::WriteOnly | QIODevice::Append); else { in.skipRawData(m_LastHeader.length); qDebug() << "transfer not ready to recieve data. status is" << target->status; qDebug() << "skipping" << m_LastHeader.length << "bytes of data"; break; } char * buffer = new char[m_LastHeader.length]; if (in.readRawData(buffer, m_LastHeader.length) == -1) { qDebug() << "error reading from socket"; qDebug() << "length" << m_LastHeader.length; qDebug() << "available Bytes" << m_Socket->bytesAvailable(); } QDataStream out(&file); out.writeRawData((const char *)(buffer), m_LastHeader.length); target->transfered += m_LastHeader.length; delete[] buffer; emit downloadUpdated(target); } break; default: in.skipRawData(m_LastHeader.length); qDebug() << "unknown header"; qDebug() << "skipping" << m_LastHeader.length << "bytes of data"; break; } } } readHeader(in); } } void CWorker::writePing() { qDebug() << "write ping... thread id" << QThread::currentThreadId(); if (m_IdCounter > 10) return; QByteArray block; QDataStream out(&block, QIODevice::WriteOnly); out.setVersion(QDataStream::Qt_4_0); TransferHeader header; header.type = HT_ACK; header.id = m_IdCounter++; header.length = 0; out.writeRawData((const char *)(&header), sizeof(TransferHeader)); qDebug() << "sending Ping..." << header.id; if (m_Socket->write(block) == -1) qDebug() << "error"; else qDebug() << "done"; } void CWorker::readPing() { qDebug() << "read ping... thread id" << QThread::currentThreadId(); QDataStream in(m_Socket); in.setVersion(QDataStream::Qt_4_0); do { if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) { in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader)); m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0); if (m_LastHeaderValid) qDebug() << "got valid ping" << m_LastHeader.id; else qDebug() << "got invalid ping" << m_LastHeader.id; writePing(); } else m_LastHeaderValid = false; } while (m_LastHeaderValid); } void CWorker::error(QAbstractSocket::SocketError) { qDebug() << m_Socket->errorString(); } }