CPPMyth
Library to interoperate with MythTV server
mythprotoplayback.cpp
1 /*
2  * Copyright (C) 2014 Jean-Luc Barriere
3  *
4  * This Program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2, or (at your option)
7  * any later version.
8  *
9  * This Program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; see the file COPYING. If not, write to
16  * the Free Software Foundation, 51 Franklin Street, Fifth Floor, Boston,
17  * MA 02110-1301 USA
18  * http://www.gnu.org/copyleft/gpl.html
19  *
20  */
21 
22 #include "mythprotoplayback.h"
23 #include "../private/debug.h"
24 #include "../private/socket.h"
25 #include "../private/os/threads/mutex.h"
26 #include "../private/builtin.h"
27 
28 #include <limits>
29 #include <cstdio>
30 
31 #ifdef __WINDOWS__
32 #include <Ws2tcpip.h>
33 #else
34 #include <sys/socket.h> // for recv
35 #include <sys/select.h> // for select
36 #endif /* __WINDOWS__ */
37 
38 using namespace Myth;
39 
44 
45 ProtoPlayback::ProtoPlayback(const std::string& server, unsigned port)
46 : ProtoBase(server, port)
47 {
48 }
49 
50 bool ProtoPlayback::Open()
51 {
52  bool ok = false;
53 
54  if (!OpenConnection(PROTO_PLAYBACK_RCVBUF))
55  return false;
56 
57  if (m_protoVersion >= 75)
58  ok = Announce75();
59 
60  if (ok)
61  return true;
62  Close();
63  return false;
64 }
65 
66 void ProtoPlayback::Close()
67 {
68  ProtoBase::Close();
69  // Clean hanging and disable retry
70  m_tainted = m_hang = false;
71 }
72 
73 bool ProtoPlayback::IsOpen()
74 {
75  // Try reconnect
76  if (m_hang)
77  return ProtoPlayback::Open();
78  return ProtoBase::IsOpen();
79 }
80 
81 bool ProtoPlayback::Announce75()
82 {
83  OS::CLockGuard lock(*m_mutex);
84 
85  std::string cmd("ANN Playback ");
86  cmd.append(m_socket->GetMyHostName()).append(" 0");
87  if (!SendCommand(cmd.c_str()))
88  return false;
89 
90  std::string field;
91  if (!ReadField(field) || !IsMessageOK(field))
92  goto out;
93  return true;
94 
95 out:
96  FlushMessage();
97  return false;
98 }
99 
100 void ProtoPlayback::TransferDone75(ProtoTransfer& transfer)
101 {
102  char buf[32];
103 
104  OS::CLockGuard lock(*m_mutex);
105  if (!transfer.IsOpen())
106  return;
107  std::string cmd("QUERY_FILETRANSFER ");
108  uint32_to_string(transfer.GetFileId(), buf);
109  cmd.append(buf).append(PROTO_STR_SEPARATOR).append("DONE");
110  if (SendCommand(cmd.c_str()))
111  {
112  std::string field;
113  if (!ReadField(field) || !IsMessageOK(field))
114  FlushMessage();
115  }
116 }
117 
118 bool ProtoPlayback::TransferIsOpen75(ProtoTransfer& transfer)
119 {
120  char buf[32];
121  std::string field;
122  int8_t status = 0;
123 
124  OS::CLockGuard lock(*m_mutex);
125  if (!IsOpen())
126  return false;
127  std::string cmd("QUERY_FILETRANSFER ");
128  uint32_to_string(transfer.GetFileId(), buf);
129  cmd.append(buf);
130  cmd.append(PROTO_STR_SEPARATOR);
131  cmd.append("IS_OPEN");
132 
133  if (!SendCommand(cmd.c_str()))
134  return false;
135  if (!ReadField(field) || 0 != string_to_int8(field.c_str(), &status))
136  {
137  FlushMessage();
138  return false;
139  }
140  if (status == 0)
141  return false;
142  return true;
143 }
144 
145 int ProtoPlayback::TransferRequestBlock(ProtoTransfer& transfer, void *buffer, unsigned n)
146 {
147  bool request = false, data = false;
148  int r = 0, nfds = 0, fdc, fdd;
149  char *p = (char*)buffer;
150  struct timeval tv;
151  fd_set fds;
152  unsigned s = 0;
153 
154  int64_t filePosition = transfer.GetPosition();
155  int64_t fileRequest = transfer.GetRequested();
156 
157  if (n == 0)
158  return n;
159 
160  fdc = GetSocket();
161  if (INVALID_SOCKET_VALUE == (net_socket_t)fdc)
162  return -1;
163  fdd = transfer.GetSocket();
164  if (INVALID_SOCKET_VALUE == (net_socket_t)fdd)
165  return -1;
166  // Max size is RCVBUF size
167  if (n > PROTO_TRANSFER_RCVBUF)
168  n = PROTO_TRANSFER_RCVBUF;
169  if ((filePosition + n) > fileRequest)
170  {
171  // Begin critical section
172  m_mutex->Lock();
173  bool ok = TransferRequestBlock75(transfer, n);
174  if (!ok)
175  {
176  m_mutex->Unlock();
177  goto err;
178  }
179  request = true;
180  }
181 
182  do
183  {
184  FD_ZERO(&fds);
185  if (request)
186  {
187  FD_SET((net_socket_t)fdc, &fds);
188  if (nfds < fdc)
189  nfds = fdc;
190  }
191  FD_SET((net_socket_t)fdd, &fds);
192  if (nfds < fdd)
193  nfds = fdd;
194 
195  if (data)
196  {
197  // Read directly to get all queued packets
198  tv.tv_sec = 0;
199  tv.tv_usec = 0;
200  }
201  else
202  {
203  // Wait and read for new packet
204  tv.tv_sec = 10;
205  tv.tv_usec = 0;
206  }
207 
208  r = select (nfds + 1, &fds, NULL, NULL, &tv);
209  if (r < 0)
210  {
211  DBG(DBG_ERROR, "%s: select error (%d)\n", __FUNCTION__, r);
212  goto err;
213  }
214  if (r == 0 && !data)
215  {
216  DBG(DBG_ERROR, "%s: select timeout\n", __FUNCTION__);
217  goto err;
218  }
219  // Check for data
220  data = false;
221  if (FD_ISSET((net_socket_t)fdd, &fds))
222  {
223  r = recv((net_socket_t)fdd, p, (size_t)(n - s), 0);
224  if (r < 0)
225  {
226  DBG(DBG_ERROR, "%s: recv data error (%d)\n", __FUNCTION__, r);
227  goto err;
228  }
229  if (r > 0)
230  {
231  data = true;
232  s += r;
233  p += r;
234  filePosition += r;
235  transfer.SetPosition(filePosition);
236  }
237  }
238  // Check for response of request
239  if (request && FD_ISSET((net_socket_t)fdc, &fds))
240  {
241  int32_t rlen = TransferRequestBlockFeedback75();
242  request = false; // request is completed
243  m_mutex->Unlock();
244  if (rlen < 0)
245  goto err;
246  DBG(DBG_DEBUG, "%s: receive block size (%u)\n", __FUNCTION__, (unsigned)rlen);
247  if (rlen == 0 && !data)
248  break; // no more data
249  fileRequest += rlen;
250  transfer.SetRequested(fileRequest);
251  }
252  } while (request || data || !s);
253  DBG(DBG_DEBUG, "%s: data read (%u)\n", __FUNCTION__, s);
254  return (int)s;
255 err:
256  if (request)
257  {
258  if (RcvMessageLength())
259  FlushMessage();
260  m_mutex->Unlock();
261  }
262  // Recover the file position or die
263  if (TransferSeek(transfer, filePosition, WHENCE_SET) < 0)
264  HangException();
265  return -1;
266 }
267 
268 bool ProtoPlayback::TransferRequestBlock75(ProtoTransfer& transfer, unsigned n)
269 {
270  // Note: Caller has to hold mutex until feedback or cancel point
271  char buf[32];
272 
273  if (!transfer.IsOpen())
274  return false;
275  std::string cmd("QUERY_FILETRANSFER ");
276  uint32_to_string(transfer.GetFileId(), buf);
277  cmd.append(buf);
278  cmd.append(PROTO_STR_SEPARATOR);
279  cmd.append("REQUEST_BLOCK");
280  cmd.append(PROTO_STR_SEPARATOR);
281  uint32_to_string(n, buf);
282  cmd.append(buf);
283 
284  // No wait for feedback
285  if (!SendCommand(cmd.c_str(), false))
286  return false;
287  return true;
288 }
289 
290 int32_t ProtoPlayback::TransferRequestBlockFeedback75()
291 {
292  int32_t rlen = 0;
293  std::string field;
294  if (!RcvMessageLength() || !ReadField(field) || 0 != string_to_int32(field.c_str(), &rlen) || rlen < 0)
295  {
296  DBG(DBG_ERROR, "%s: invalid response for request block (%s)\n", __FUNCTION__, field.c_str());
297  FlushMessage();
298  return -1;
299  }
300  return rlen;
301 }
302 
303 int64_t ProtoPlayback::TransferSeek75(ProtoTransfer& transfer, int64_t offset, WHENCE_t whence)
304 {
305  char buf[32];
306  int64_t position = 0;
307  std::string field;
308 
309  int64_t filePosition = transfer.GetPosition();
310  int64_t fileSize = transfer.GetSize();
311 
312  // Check offset
313  switch (whence)
314  {
315  case WHENCE_CUR:
316  if (offset == 0)
317  return filePosition;
318  position = filePosition + offset;
319  if (position < 0 || position > fileSize)
320  return -1;
321  break;
322  case WHENCE_SET:
323  if (offset == filePosition)
324  return filePosition;
325  if (offset < 0 || offset > fileSize)
326  return -1;
327  break;
328  case WHENCE_END:
329  position = fileSize - offset;
330  if (position < 0 || position > fileSize)
331  return -1;
332  break;
333  default:
334  return -1;
335  }
336 
337  OS::CLockGuard lock(*m_mutex);
338  if (!transfer.IsOpen())
339  return -1;
340  std::string cmd("QUERY_FILETRANSFER ");
341  uint32_to_string(transfer.GetFileId(), buf);
342  cmd.append(buf);
343  cmd.append(PROTO_STR_SEPARATOR);
344  cmd.append("SEEK");
345  cmd.append(PROTO_STR_SEPARATOR);
346  int64_to_string(offset, buf);
347  cmd.append(buf);
348  cmd.append(PROTO_STR_SEPARATOR);
349  int8_to_string(whence, buf);
350  cmd.append(buf);
351  cmd.append(PROTO_STR_SEPARATOR);
352  int64_to_string(filePosition, buf);
353  cmd.append(buf);
354 
355  if (!SendCommand(cmd.c_str()))
356  return -1;
357  if (!ReadField(field) || 0 != string_to_int64(field.c_str(), &position))
358  {
359  FlushMessage();
360  return -1;
361  }
362  // Reset transfer
363  transfer.Flush();
364  transfer.SetRequested(position);
365  transfer.SetPosition(position);
366  return position;
367 }
void Flush()
Flushing unread data previously requested.
bool m_hang
Connection hang: while true allow retry.
Definition: mythprotobase.h:76
bool ReadField(std::string &field)
This is the main namespace that encloses all public classes.
Definition: mythcontrol.h:29
bool m_tainted
Connection has hung since last reset.
Definition: mythprotobase.h:77