You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

409 lines
12KB

  1. /***************************************************************************
  2. * Copyright (C) 2008 by Oliver Groß *
  3. * z.o.gross@gmx.de *
  4. * *
  5. * This program is free software; you can redistribute it and/or modify *
  6. * it under the terms of the GNU General Public License as published by *
  7. * the Free Software Foundation; either version 2 of the License, or *
  8. * (at your option) any later version. *
  9. * *
  10. * This program is distributed in the hope that it will be useful, *
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of *
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
  13. * GNU General Public License for more details. *
  14. * *
  15. * You should have received a copy of the GNU General Public License *
  16. * along with this program; if not, write to the *
  17. * Free Software Foundation, Inc., *
  18. * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. *
  19. ***************************************************************************/
  20. #include "cworker.h"
  21. #include <QTcpSocket>
  22. #include <QByteArray>
  23. #include <QFile>
  24. #include <QDir>
  25. #include <QThread>
  26. namespace qftrans {
  27. CWorker::CWorker(QTcpSocket * socket, QObject * parent) : QObject(parent), m_Socket(socket), m_IdCounter(0) {
  28. connect(m_Socket, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(error(QAbstractSocket::SocketError)));
  29. connect(m_Socket, SIGNAL(bytesWritten(qint64)), this, SLOT(writeData()), Qt::QueuedConnection);
  30. connect(m_Socket, SIGNAL(readyRead()), this, SLOT(readData()), Qt::QueuedConnection);
  31. qDebug() << "worker created";
  32. }
  33. CWorker::~CWorker() {
  34. qDebug() << "worker removed";
  35. }
  36. //TODO addUpload
  37. void CWorker::addUpload(TransferData * transfer) {
  38. transfer->status = TS_PENDING;
  39. if (!m_Uploads.contains(transfer)) {
  40. qDebug() << "adding upload";
  41. transfer->id = m_IdCounter;
  42. m_IdCounter++;
  43. m_Uploads.enqueue(transfer);
  44. }
  45. else
  46. qDebug() << "(re)adding upload";
  47. if (m_Socket->bytesToWrite() == 0) {
  48. qDebug() << "executing upload";
  49. writeData();
  50. }
  51. }
  52. //TODO removeUpload
  53. void CWorker::removeUpload(TransferData * transfer) {
  54. if (m_Uploads.contains(transfer) && transfer->status > TS_PENDING) {
  55. transfer->status = TS_CLOSING;
  56. if (m_Socket->bytesToWrite() == 0) {
  57. qDebug() << "executing closedown";
  58. writeData();
  59. }
  60. }
  61. }
  62. //TODO ? startUpload
  63. // void CWorker::startUpload(TransferData * transfer) {
  64. // if (m_Uploads.contains(transfer) && transfer->status != TS_PENDING)
  65. // transfer->status = TS_PENDING;
  66. //
  67. // if (m_Socket->bytesToWrite() == 0) {
  68. // qDebug() << "executing upload";
  69. // writeData();
  70. // }
  71. // }
  72. void CWorker::startUploadData(quint32 id) {
  73. if (m_Uploads.isEmpty())
  74. return;
  75. TransferData * transfer = m_Uploads.head(); /*m_UploadDict.key(id);*/
  76. if (transfer->id != id) {
  77. qDebug() << "got invalid id:" << id << "... expected:" << transfer->id;
  78. return;
  79. }
  80. qDebug() << "scheduling upload for transfer";
  81. transfer->status = TS_TRANSFERING;
  82. if (m_Socket->bytesToWrite() == 0) {
  83. qDebug() << "executing upload";
  84. writeData();
  85. }
  86. }
  87. void CWorker::sendAcknowledge(quint32 id) {
  88. QByteArray block;
  89. QDataStream out(&block, QIODevice::WriteOnly);
  90. out.setVersion(QDataStream::Qt_4_0);
  91. TransferHeader header;
  92. header.type = HT_ACK;
  93. header.id = id;
  94. header.length = 0;
  95. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  96. qDebug() << "sending HT_ACK" << id;
  97. m_Socket->write(block);
  98. qDebug() << "sent HT_ACK" << id;
  99. }
  100. //TODO CTransferThread::removeDownload
  101. void CWorker::removeDownload(TransferData * /*transfer*/) {
  102. }
  103. void CWorker::setDestinationDir(QString & value) {
  104. m_DestinationDir = value;
  105. }
  106. void CWorker::writeData() {
  107. if (m_Uploads.isEmpty()) {
  108. // if (m_UploadDict.isEmpty()) {
  109. qDebug() << "no more uploads.";
  110. return;
  111. }
  112. TransferData * transfer = m_Uploads.head();
  113. if (transfer->status == TS_PENDING)
  114. transfer->status = TS_ANNOUNCING;
  115. QByteArray block;
  116. QDataStream out(&block, QIODevice::ReadWrite);
  117. out.setVersion(QDataStream::Qt_4_0);
  118. TransferHeader header;
  119. switch (transfer->status) {
  120. case TS_ANNOUNCING:
  121. {
  122. qDebug() << "announcing" << transfer->id;
  123. if (transfer->transfered)
  124. transfer->transfered = 0;
  125. qint64 oldPosition = block.size();
  126. //fake write filename
  127. out << transfer->fileName;
  128. out.device()->seek(oldPosition);
  129. // ANNOUNCE header with size and filename
  130. header.type = HT_ANNOUNCE;
  131. header.id = transfer->id;
  132. header.length = quint32(block.size() - oldPosition) + sizeof(qint64);
  133. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  134. qDebug() << "sending size" << transfer->size;
  135. out << transfer->size;
  136. qDebug() << "sending filename" << transfer->fileName;
  137. out << transfer->fileName;
  138. transfer->status = TS_WAITING;
  139. }
  140. break;
  141. case TS_TRANSFERING:
  142. {
  143. header.type = HT_DATA;
  144. header.id = transfer->id;
  145. if (transfer->transfered + DATA_BLOCK_SIZE < transfer->size)
  146. header.length = DATA_BLOCK_SIZE;
  147. else
  148. header.length = transfer->size - transfer->transfered;
  149. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  150. char * buffer = new char[header.length];
  151. QFile file(transfer->fileDir + QDir::separator() + transfer->fileName);
  152. file.open(QIODevice::ReadOnly);
  153. QDataStream in(&file);
  154. if (in.skipRawData(transfer->transfered) == -1) {
  155. qDebug() << "filename" << file.fileName();
  156. qDebug() << "skip error";
  157. qDebug() << "file size" << transfer->size;
  158. qDebug() << "transfered bytes" << transfer->transfered;
  159. }
  160. if (in.readRawData(buffer, header.length) == -1) {
  161. qDebug() << "Read error";
  162. qDebug() << "buffer size" << header.length;
  163. qDebug() << "file size" << transfer->size;
  164. qDebug() << "transfered bytes" << transfer->transfered;
  165. }
  166. out.writeRawData((const char *)(buffer), header.length);
  167. transfer->transfered += header.length;
  168. delete[] buffer;
  169. if (transfer->transfered >= transfer->size)
  170. transfer->status = TS_CLOSING;
  171. if (transfer->transfered > transfer->size) {
  172. qDebug() << "overread?";
  173. qDebug() << "file size" << transfer->size;
  174. qDebug() << "transfered bytes" << transfer->transfered;
  175. }
  176. emit uploadUpdated(transfer);
  177. }
  178. break;
  179. case TS_CLOSING:
  180. header.type = HT_CLOSE;
  181. header.id = transfer->id;
  182. header.length = 0;
  183. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  184. transfer->status = TS_FINISHED;
  185. m_Uploads.dequeue();
  186. emit uploadUpdated(transfer);
  187. break;
  188. default:
  189. break;
  190. }
  191. if (!block.isEmpty())
  192. m_Socket->write(block);
  193. }
  194. void CWorker::readHeader(QDataStream & in) {
  195. if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) {
  196. in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader));
  197. m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0);
  198. if (!m_LastHeaderValid) {
  199. qDebug() << "got invalid header data";
  200. qDebug() << "available Bytes" << m_Socket->bytesAvailable();
  201. }
  202. }
  203. else
  204. m_LastHeaderValid = false;
  205. }
  206. void CWorker::readData() {
  207. QDataStream in(m_Socket);
  208. in.setVersion(QDataStream::Qt_4_0);
  209. if (!m_LastHeaderValid)
  210. readHeader(in);
  211. TransferData * target = NULL;
  212. while (m_LastHeaderValid && m_Socket->bytesAvailable() >= m_LastHeader.length) {
  213. // qDebug() << "available Bytes" << m_Socket->bytesAvailable() << "length" << m_LastHeader.length;
  214. //look for existing id
  215. if (m_LastHeader.type == HT_ACK) {
  216. qDebug() << "got HT_ACK .. scheduling upload to start";
  217. startUploadData(m_LastHeader.id);
  218. }
  219. else {
  220. if (m_DownloadDict.contains(m_LastHeader.id)) {
  221. if (m_LastHeader.type == HT_CLOSE) {
  222. //finished or canceled by peer
  223. qDebug() << "download closed py peer";
  224. TransferData * closed = m_DownloadDict.take(m_LastHeader.id);
  225. closed->status = TS_FINISHED;
  226. emit downloadRemoved(closed);
  227. }
  228. else
  229. //assign target
  230. target = m_DownloadDict[m_LastHeader.id];
  231. }
  232. //id not in dict
  233. else if (m_LastHeader.type == HT_ANNOUNCE) {
  234. //new file announced
  235. qDebug() << "download announced";
  236. TransferData * newFile = new TransferData;
  237. m_DownloadDict.insert(m_LastHeader.id, newFile);
  238. newFile->transfered = 0;
  239. newFile->size = 0;
  240. newFile->localFile = false;
  241. newFile->status = TS_WAITING;
  242. newFile->fileDir = m_DestinationDir;
  243. target = newFile;
  244. }
  245. //check valid target and data available
  246. if (target && m_LastHeader.length) {
  247. switch (m_LastHeader.type) {
  248. case HT_ANNOUNCE:
  249. qDebug() << "reading size..";
  250. in >> target->size;
  251. qDebug() << "got size:" << target->size;
  252. qDebug() << "reading filename..";
  253. in >> target->fileName;
  254. qDebug() << "got filename:" << target->fileName;
  255. emit downloadUpdated(target);
  256. qDebug() << "trying to send HT_ACK";
  257. //TODO better do this async
  258. sendAcknowledge(m_LastHeader.id);
  259. break;
  260. case HT_DATA:
  261. {
  262. // qDebug() << "got data" << target->transfered << '/' << target->size;
  263. QFile file(target->fileDir + QDir::separator() + target->fileName);
  264. if (target->status == TS_WAITING) {
  265. target->status = TS_TRANSFERING;
  266. emit downloadAdded(target);
  267. file.open(QIODevice::WriteOnly | QIODevice::Truncate);
  268. }
  269. else if (target->status == TS_TRANSFERING)
  270. file.open(QIODevice::WriteOnly | QIODevice::Append);
  271. else {
  272. in.skipRawData(m_LastHeader.length);
  273. qDebug() << "transfer not ready to recieve data. status is" << target->status;
  274. qDebug() << "skipping" << m_LastHeader.length << "bytes of data";
  275. break;
  276. }
  277. char * buffer = new char[m_LastHeader.length];
  278. if (in.readRawData(buffer, m_LastHeader.length) == -1) {
  279. qDebug() << "error reading from socket";
  280. qDebug() << "length" << m_LastHeader.length;
  281. qDebug() << "available Bytes" << m_Socket->bytesAvailable();
  282. }
  283. QDataStream out(&file);
  284. out.writeRawData((const char *)(buffer), m_LastHeader.length);
  285. target->transfered += m_LastHeader.length;
  286. delete[] buffer;
  287. emit downloadUpdated(target);
  288. }
  289. break;
  290. default:
  291. in.skipRawData(m_LastHeader.length);
  292. qDebug() << "unknown header";
  293. qDebug() << "skipping" << m_LastHeader.length << "bytes of data";
  294. break;
  295. }
  296. }
  297. }
  298. readHeader(in);
  299. }
  300. }
  301. void CWorker::writePing() {
  302. qDebug() << "write ping... thread id" << QThread::currentThreadId();
  303. if (m_IdCounter > 10)
  304. return;
  305. QByteArray block;
  306. QDataStream out(&block, QIODevice::WriteOnly);
  307. out.setVersion(QDataStream::Qt_4_0);
  308. TransferHeader header;
  309. header.type = HT_ACK;
  310. header.id = m_IdCounter++;
  311. header.length = 0;
  312. out.writeRawData((const char *)(&header), sizeof(TransferHeader));
  313. qDebug() << "sending Ping..." << header.id;
  314. if (m_Socket->write(block) == -1)
  315. qDebug() << "error";
  316. else
  317. qDebug() << "done";
  318. }
  319. void CWorker::readPing() {
  320. qDebug() << "read ping... thread id" << QThread::currentThreadId();
  321. QDataStream in(m_Socket);
  322. in.setVersion(QDataStream::Qt_4_0);
  323. do {
  324. if (m_Socket->bytesAvailable() >= (qint64)(sizeof(TransferHeader))) {
  325. in.readRawData((char *)(&m_LastHeader), sizeof(TransferHeader));
  326. m_LastHeaderValid = (qstrcmp(m_LastHeader.descriptor, DESCRIPTOR_STRING) == 0);
  327. if (m_LastHeaderValid)
  328. qDebug() << "got valid ping" << m_LastHeader.id;
  329. else
  330. qDebug() << "got invalid ping" << m_LastHeader.id;
  331. writePing();
  332. }
  333. else
  334. m_LastHeaderValid = false;
  335. }
  336. while (m_LastHeaderValid);
  337. }
  338. void CWorker::error(QAbstractSocket::SocketError) {
  339. qDebug() << m_Socket->errorString();
  340. }
  341. }