You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

cworker.cpp 12 KiB

11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago

  1. /***************************************************************************
  2. * Copyright (C) 2008 by Oliver Groß *
  3. * z.o.gross@gmx.de *
  4. * *
  5. * This program is free software; you can redistribute it and/or modify *
  6. * it under the terms of the GNU General Public License as published by *
  7. * the Free Software Foundation; either version 2 of the License, or *
  8. * (at your option) any later version. *
  9. * *
  10. * This program is distributed in the hope that it will be useful, *
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of *
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
  13. * GNU General Public License for more details. *
  14. * *
  15. * You should have received a copy of the GNU General Public License *
  16. * along with this program; if not, write to the *
  17. * Free Software Foundation, Inc., *
  18. * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
  19. ***************************************************************************/
  20. #include "cworker.h"
  21. #include <QTcpSocket>
  22. #include <QByteArray>
  23. #include <QFile>
  24. #include <QDir>
  25. #include <QThread>
  26. namespace qftrans {
  27. CWorker::CWorker(QTcpSocket * socket, QObject * parent) : QObject(parent), m_Socket(socket), m_IdCounter(0) {
  28. connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
  29. connect(m_Socket, SIGNAL(bytesWritten(qint64)), this, SLOT(writeData()), Qt::QueuedConnection);
  30. connect(m_Socket, SIGNAL(readyRead()), this, SLOT(readData()), Qt::QueuedConnection);
  31. qDebug() << "worker created";
  32. }
  33. CWorker::~CWorker() {
  34. qDebug() << "worker removed";
  35. }
  36. //TODO addUpload
  37. void CWorker::addUpload(TransferData * transfer) {
  38. transfer->status = TS_PENDING;
  39. if (!m_Uploads.contains(transfer)) {
  40. qDebug() << "adding upload";
  41. transfer->id = m_IdCounter;
  42. m_IdCounter++;
  43. m_Uploads.enqueue(transfer);
  44. }
  45. else
  46. qDebug() << "(re)adding upload";
  47. if (m_Socket->bytesToWrite() == 0) {
  48. qDebug() << "executing upload";
  49. writeData();
  50. }
  51. }
  52. //TODO removeUpload
  53. void CWorker::removeUpload(TransferData * transfer) {
  54. if (m_Uploads.contains(transfer) && transfer->status > TS_PENDING) {
  55. transfer->status = TS_CLOSING;
  56. if (m_Socket->bytesToWrite() == 0) {
  57. qDebug() << "executing closedown";
  58. writeData();
  59. }
  60. }
  61. }
  62. //TODO ? startUpload
  63. // void CWorker::startUpload(TransferData * transfer) {
  64. // if (m_Uploads.contains(transfer) && transfer->status != TS_PENDING)
  65. // transfer->status = TS_PENDING;
  66. //
  67. // if (m_Socket->bytesToWrite() == 0) {
  68. // qDebug() << "executing upload";
  69. // writeData();
  70. // }
  71. // }
  72. void CWorker::startUploadData(quint32 id) {
  73. if (m_Uploads.isEmpty())
  74. return;
  75. TransferData * transfer = m_Uploads.head(); /*m_UploadDict.key(id);*/
  76. if (transfer->id != id) {
  77. qDebug() << "got invalid id:" << id << "... expected:" << transfer->id;
  78. return;
  79. }
  80. qDebug() << "scheduling upload for transfer";
  81. transfer->status = TS_TRANSFERING;
  82. if (m_Socket->bytesToWrite() == 0) {
  83. qDebug() << "executing upload";
  84. writeData();
  85. }
  86. }
  87. void CWorker::sendAcknowledge(quint32 id) {
  88. QByteArray block;
  89. QDataStream out(&block, QIODevice::WriteOnly);
  90. out.setVersion(QDataStream::Qt_4_0);
  91. TransferHeader header;
  92. header.type = HT_ACK;
  93. header.id = id;
  94. header.length = 0;
  95. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  96. qDebug() << "sending HT_ACK" << id;
  97. m_Socket->write(block);
  98. qDebug() << "sent HT_ACK" << id;
  99. }
  100. //TODO CTransferThread::removeDownload
  101. void CWorker::removeDownload(TransferData * /*transfer*/) {
  102. }
  103. void CWorker::setDestinationDir(QString & value) {
  104. m_DestinationDir = value;
  105. }
  106. void CWorker::writeData() {
  107. if (m_Uploads.isEmpty()) {
  108. // if (m_UploadDict.isEmpty()) {
  109. qDebug() << "no more uploads.";
  110. return;
  111. }
  112. TransferData * transfer = m_Uploads.head();
  113. if (transfer->status == TS_PENDING)
  114. transfer->status = TS_ANNOUNCING;
  115. QByteArray block;
  116. QDataStream out(&block, QIODevice::ReadWrite);
  117. out.setVersion(QDataStream::Qt_4_0);
  118. TransferHeader header;
  119. switch (transfer->status) {
  120. case TS_ANNOUNCING:
  121. {
  122. qDebug() << "announcing" << transfer->id;
  123. if (transfer->transfered)
  124. transfer->transfered = 0;
  125. qint64 oldPosition = block.size();
  126. //fake write filename
  127. out << transfer->fileName;
  128. out.device()->seek(oldPosition);
  129. // ANNOUNCE header with size and filename
  130. header.type = HT_ANNOUNCE;
  131. header.id = transfer->id;
  132. header.length = quint32(block.size() - oldPosition) + sizeof(qint64);
  133. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  134. qDebug() << "sending size" << transfer->size;
  135. out << transfer->size;
  136. qDebug() << "sending filename" << transfer->fileName;
  137. out << transfer->fileName;
  138. transfer->status = TS_WAITING;
  139. }
  140. break;
  141. case TS_TRANSFERING:
  142. {
  143. header.type = HT_DATA;
  144. header.id = transfer->id;
  145. if (transfer->transfered + DATA_BLOCK_SIZE < transfer->size)
  146. header.length = DATA_BLOCK_SIZE;
  147. else
  148. header.length = transfer->size - transfer->transfered;
  149. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  150. char * buffer = new char[header.length];
  151. QFile file(transfer->fileDir + QDir::separator() + transfer->fileName);
  152. file.open(QIODevice::ReadOnly);
  153. QDataStream in(&file);
  154. if (in.skipRawData(transfer->transfered) == -1) {
  155. qDebug() << "filename" << file.fileName();
  156. qDebug() << "skip error";
  157. qDebug() << "file size" << transfer->size;
  158. qDebug() << "transfered bytes" << transfer->transfered;
  159. }
  160. if (in.readRawData(buffer, header.length) == -1) {
  161. qDebug() << "Read error";
  162. qDebug() << "buffer size" << header.length;
  163. qDebug() << "file size" << transfer->size;
  164. qDebug() << "transfered bytes" << transfer->transfered;
  165. }
  166. out.writeRawData((const char *)(buffer), header.length);
  167. transfer->transfered += header.length;
  168. delete[] buffer;
  169. if (transfer->transfered >= transfer->size)
  170. transfer->status = TS_CLOSING;
  171. if (transfer->transfered > transfer->size) {
  172. qDebug() << "overread?";
  173. qDebug() << "file size" << transfer->size;
  174. qDebug() << "transfered bytes" << transfer->transfered;
  175. }
  176. emit uploadUpdated(transfer);
  177. }
  178. break;
  179. case TS_CLOSING:
  180. header.type = HT_CLOSE;
  181. header.id = transfer->id;
  182. header.length = 0;
  183. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  184. transfer->status = TS_FINISHED;
  185. m_Uploads.dequeue();
  186. emit uploadUpdated(transfer);
  187. break;
  188. default:
  189. break;
  190. }
  191. if (!block.isEmpty())
  192. m_Socket->write(block);
  193. }
  194. void CWorker::readHeader(QDataStream & in) {
  195. if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) {
  196. in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader));
  197. m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0);
  198. if (!m_LastHeaderValid) {
  199. qDebug() << "got invalid header data";
  200. qDebug() << "available Bytes" << m_Socket->bytesAvailable();
  201. }
  202. }
  203. else
  204. m_LastHeaderValid = false;
  205. }
  206. void CWorker::readData() {
  207. QDataStream in(m_Socket);
  208. in.setVersion(QDataStream::Qt_4_0);
  209. if (!m_LastHeaderValid)
  210. readHeader(in);
  211. TransferData * target = NULL;
  212. while (m_LastHeaderValid && m_Socket->bytesAvailable() >= m_LastHeader.length) {
  213. // qDebug() << "available Bytes" << m_Socket->bytesAvailable() << "length" << m_LastHeader.length;
  214. //look for existing id
  215. if (m_LastHeader.type == HT_ACK) {
  216. qDebug() << "got HT_ACK .. scheduling upload to start";
  217. startUploadData(m_LastHeader.id);
  218. }
  219. else {
  220. if (m_DownloadDict.contains(m_LastHeader.id)) {
  221. if (m_LastHeader.type == HT_CLOSE) {
  222. //finished or canceled by peer
  223. qDebug() << "download closed py peer";
  224. TransferData * closed = m_DownloadDict.take(m_LastHeader.id);
  225. closed->status = TS_FINISHED;
  226. emit downloadRemoved(closed);
  227. }
  228. else
  229. //assign target
  230. target = m_DownloadDict[m_LastHeader.id];
  231. }
  232. //id not in dict
  233. else if (m_LastHeader.type == HT_ANNOUNCE) {
  234. //new file announced
  235. qDebug() << "download announced";
  236. TransferData * newFile = new TransferData;
  237. m_DownloadDict.insert(m_LastHeader.id, newFile);
  238. newFile->transfered = 0;
  239. newFile->size = 0;
  240. newFile->localFile = false;
  241. newFile->status = TS_WAITING;
  242. newFile->fileDir = m_DestinationDir;
  243. target = newFile;
  244. }
  245. //check valid target and data available
  246. if (target && m_LastHeader.length) {
  247. switch (m_LastHeader.type) {
  248. case HT_ANNOUNCE:
  249. qDebug() << "reading size..";
  250. in >> target->size;
  251. qDebug() << "got size:" << target->size;
  252. qDebug() << "reading filename..";
  253. in >> target->fileName;
  254. qDebug() << "got filename:" << target->fileName;
  255. emit downloadUpdated(target);
  256. qDebug() << "trying to send HT_ACK";
  257. //TODO better do this async
  258. sendAcknowledge(m_LastHeader.id);
  259. break;
  260. case HT_DATA:
  261. {
  262. // qDebug() << "got data" << target->transfered << '/' << target->size;
  263. QFile file(target->fileDir + QDir::separator() + target->fileName);
  264. if (target->status == TS_WAITING) {
  265. target->status = TS_TRANSFERING;
  266. emit downloadAdded(target);
  267. file.open(QIODevice::WriteOnly | QIODevice::Truncate);
  268. }
  269. else if (target->status == TS_TRANSFERING)
  270. file.open(QIODevice::WriteOnly | QIODevice::Append);
  271. else {
  272. in.skipRawData(m_LastHeader.length);
  273. qDebug() << "transfer not ready to recieve data. status is" << target->status;
  274. qDebug() << "skipping" << m_LastHeader.length << "bytes of data";
  275. break;
  276. }
  277. char * buffer = new char[m_LastHeader.length];
  278. if (in.readRawData(buffer, m_LastHeader.length) == -1) {
  279. qDebug() << "error reading from socket";
  280. qDebug() << "length" << m_LastHeader.length;
  281. qDebug() << "available Bytes" << m_Socket->bytesAvailable();
  282. }
  283. QDataStream out(&file);
  284. out.writeRawData((const char *)(buffer), m_LastHeader.length);
  285. target->transfered += m_LastHeader.length;
  286. delete[] buffer;
  287. emit downloadUpdated(target);
  288. }
  289. break;
  290. default:
  291. in.skipRawData(m_LastHeader.length);
  292. qDebug() << "unknown header";
  293. qDebug() << "skipping" << m_LastHeader.length << "bytes of data";
  294. break;
  295. }
  296. }
  297. }
  298. readHeader(in);
  299. }
  300. }
  301. void CWorker::writePing() {
  302. qDebug() << "write ping... thread id" << QThread::currentThreadId();
  303. if (m_IdCounter > 10)
  304. return;
  305. QByteArray block;
  306. QDataStream out(&block, QIODevice::WriteOnly);
  307. out.setVersion(QDataStream::Qt_4_0);
  308. TransferHeader header;
  309. header.type = HT_ACK;
  310. header.id = m_IdCounter++;
  311. header.length = 0;
  312. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  313. qDebug() << "sending Ping..." << header.id;
  314. if (m_Socket->write(block) == -1)
  315. qDebug() << "error";
  316. else
  317. qDebug() << "done";
  318. }
  319. void CWorker::readPing() {
  320. qDebug() << "read ping... thread id" << QThread::currentThreadId();
  321. QDataStream in(m_Socket);
  322. in.setVersion(QDataStream::Qt_4_0);
  323. do {
  324. if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) {
  325. in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader));
  326. m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0);
  327. if (m_LastHeaderValid)
  328. qDebug() << "got valid ping" << m_LastHeader.id;
  329. else
  330. qDebug() << "got invalid ping" << m_LastHeader.id;
  331. writePing();
  332. }
  333. else
  334. m_LastHeaderValid = false;
  335. }
  336. while (m_LastHeaderValid);
  337. }
  338. void CWorker::error(QAbstractSocket::SocketError) {
  339. qDebug() << m_Socket->errorString();
  340. }
  341. }