Support turbo2.
[vuplus_dvbapp] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <lib/base/nconfig.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <sys/ioctl.h>
7 #include <sys/vfs.h>
8 #if 0
9 #include <dirent.h>
10 #else
11 #include <sys/types.h>
12 #endif
13
14 #define PVR_COMMIT 1
15
16 #define MAJORSD_        8
17 #define MAJORMMCBLK     179
18 #define LIMIT_FILESIZE_NOHDD    2*1024*1024*1024LL      // 2GBytes
19
20 //FILE *f = fopen("/log.ts", "wb");
21 static bool g_is_diskfull = false;
22
23 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
24         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
25 {
26         m_stop = 0;
27         m_sg = 0;
28         m_send_pvr_commit = 0;
29         m_stream_mode = 0;
30         m_blocksize = blocksize;
31         flush();
32         enablePVRCommit(0);
33         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
34         m_is_timeshift = false;
35         m_hdd_connected = true;
36 }
37
38 static void signal_handler(int x)
39 {
40 }
41
42 void eFilePushThread::thread()
43 {
44         setIoPrio(prio_class, prio);
45
46         off_t dest_pos = 0;
47         size_t bytes_read = 0;
48         
49         off_t current_span_offset = 0;
50         size_t current_span_remaining = 0;
51         
52         size_t written_since_last_sync = 0;
53
54 #if 0
55         DIR *tsdir_info;
56         struct dirent *tsdir_entry;
57         tsdir_info = opendir("/sys/block");
58         if (tsdir_info != NULL) {
59                 m_hdd_connected = false;
60                 while (tsdir_entry = readdir(tsdir_info)) {
61                         if (strncmp(tsdir_entry->d_name, "sd", 2) == 0) {
62                                 eDebug("HDD found: %s", tsdir_entry->d_name);
63                                 m_hdd_connected = true;
64                                 break;
65                         }
66                 }
67         }
68 #else
69         if (m_tspath.empty())
70                 defaultTSPath(m_is_timeshift);
71
72         struct stat tspath_st;
73         if (stat(m_tspath.c_str(), &tspath_st) == 0) {
74                 if (major(tspath_st.st_dev) == MAJORSD_) {
75                         eDebug("%s location on HDD!", m_tspath.c_str());
76                         m_hdd_connected = true;
77                 } else if (major(tspath_st.st_dev) == MAJORMMCBLK) {
78                         eDebug("%s location on eMMC!", m_tspath.c_str());
79                         m_hdd_connected = false;
80                 } else {
81                         eDebug("%s location on other device", m_tspath.c_str());
82                 }
83         } else {
84                 eDebug("stat failed!");
85         }
86 #endif
87
88         eDebug("FILEPUSH THREAD START");
89         
90                 /* we set the signal to not restart syscalls, so we can detect our signal. */
91         struct sigaction act;
92         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
93         act.sa_flags = 0;
94         sigaction(SIGUSR1, &act, 0);
95         
96         hasStarted();
97
98                 /* m_stop must be evaluated after each syscall. */
99         while (!m_stop)
100         {
101                         /* first try flushing the bufptr */
102                 if (m_buf_start != m_buf_end)
103                 {
104                                 /* filterRecordData wants to work on multiples of blocksize.
105                                    if it returns a negative result, it means that this many bytes should be skipped
106                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
107                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
108                                    Skipped bytes will also not be output.
109
110                                    if it returns a positive result, that means that only these many bytes should be used
111                                    in the buffer. 
112                                    
113                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
114                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
115                                    skip some data). This is probably a very special application for fast-forward, where the current
116                                    span is to be cancelled after a complete iframe has been output.
117
118                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
119                                    
120                                    we filter data only once, of course, but it might not get immediately written.
121                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
122                                 */
123                         
124                         int filter_res;
125                         
126                         do
127                         {
128                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
129
130                                 if (filter_res < 0)
131                                 {
132                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
133                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
134                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
135                                         continue; /* try again */
136                                 }
137                                 
138                                         /* adjust end of buffer to strip dropped tail bytes */
139                                 m_buf_end = m_filter_end + filter_res;
140                                         /* mark data as filtered. */
141                                 m_filter_end = m_buf_end;
142                         } while (0);
143                         
144                         ASSERT(m_filter_end == m_buf_end);
145                         
146                         if (m_buf_start == m_buf_end)
147                                 continue;
148
149                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
150                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
151                                 */
152                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
153 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
154 //                      eDebug("wrote %d bytes", w);
155                         if (w <= 0)
156                         {
157                                 if (w < 0 && (errno == EINTR || errno == EAGAIN || errno == EBUSY))
158                                         continue;
159                                 eDebug("eFilePushThread WRITE ERROR");
160                                 sendEvent(evtWriteError);
161
162                                 struct statfs fs;
163                                 if (statfs(m_tspath.c_str(), &fs) < 0) {
164                                         eDebug("statfs failed!");
165                                 }
166                                 if ((off_t)fs.f_bavail < 1) {
167                                         eDebug("not enough diskspace!");
168                                         g_is_diskfull = true;
169                                 }
170                                 break;
171                                 // ... we would stop the thread
172                         }
173
174                         written_since_last_sync += w;
175
176                         if (written_since_last_sync >= 512*1024)
177                         {
178                                 int toflush = written_since_last_sync > 2*1024*1024 ?
179                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
180                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
181                                 dest_pos -= toflush;
182                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
183                                 written_since_last_sync -= toflush;
184                         }
185
186 //                      printf("FILEPUSH: wrote %d bytes\n", w);
187                         m_buf_start += w;
188                         continue;
189                 }
190
191                 if (!m_hdd_connected) {
192                         struct stat limit_filesize;
193                         if (fstat(m_fd_dest, &limit_filesize) == 0) {
194                                 if (limit_filesize.st_size > LIMIT_FILESIZE_NOHDD) {
195                                         eDebug("eFilePushThread %lld > %lld LIMIT FILESIZE", limit_filesize.st_size, LIMIT_FILESIZE_NOHDD);
196                                         sendEvent(evtWriteError);
197
198                                         g_is_diskfull = true;
199                                         break;
200                                 }
201                         }
202                 }
203
204                         /* now fill our buffer. */
205                         
206                 if (m_sg && !current_span_remaining)
207                 {
208                         m_sg->getNextSourceSpan(m_current_position, bytes_read, current_span_offset, current_span_remaining);
209                         ASSERT(!(current_span_remaining % m_blocksize));
210                         m_current_position = current_span_offset;
211                         bytes_read = 0;
212                 }
213
214                 size_t maxread = sizeof(m_buffer);
215                 
216                         /* if we have a source span, don't read past the end */
217                 if (m_sg && maxread > current_span_remaining)
218                         maxread = current_span_remaining;
219
220                         /* align to blocksize */
221                 maxread -= maxread % m_blocksize;
222
223                 m_buf_start = 0;
224                 m_filter_end = 0;
225                 m_buf_end = 0;
226
227                 if (maxread)
228                         m_buf_end = m_source->read(m_current_position, m_buffer, maxread);
229
230                 if (m_buf_end < 0)
231                 {
232                         m_buf_end = 0;
233                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
234                                 continue;
235                         if (errno == EOVERFLOW)
236                         {
237                                 eWarning("OVERFLOW while recording");
238                                 continue;
239                         }
240                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
241                 }
242
243                         /* a read might be mis-aligned in case of a short read. */
244                 int d = m_buf_end % m_blocksize;
245                 if (d)
246                         m_buf_end -= d;
247
248                 if (m_buf_end == 0)
249                 {
250                                 /* on EOF, try COMMITting once. */
251                         if (m_send_pvr_commit)
252                         {
253                                 struct pollfd pfd;
254                                 pfd.fd = m_fd_dest;
255                                 pfd.events = POLLIN;
256                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
257                                 {
258                                         case 0:
259                                                 eDebug("wait for driver eof timeout");
260                                                 continue;
261                                         case 1:
262                                                 eDebug("wait for driver eof ok");
263                                                 break;
264                                         default:
265                                                 eDebug("wait for driver eof aborted by signal");
266                                                 continue;
267                                 }
268                         }
269                         
270                                 /* in stream_mode, we are sending EOF events 
271                                    over and over until somebody responds.
272                                    
273                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
274                         sendEvent(evtEOF);
275
276                         if (m_stream_mode)
277                         {
278                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
279                                 sleep(1);
280                                 continue;
281                         }
282                         break;
283                 } else
284                 {
285                         m_current_position += m_buf_end;
286                         bytes_read += m_buf_end;
287                         if (m_sg)
288                                 current_span_remaining -= m_buf_end;
289                 }
290 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
291
292                 if (g_is_diskfull) {
293                         sendEvent(evtUser+3);
294                         g_is_diskfull = false;
295                 }
296         }
297         fdatasync(m_fd_dest);
298
299         eDebug("FILEPUSH THREAD STOP");
300 }
301
302 void eFilePushThread::start(int fd, int fd_dest)
303 {
304         eRawFile *f = new eRawFile();
305         ePtr<iTsSource> source = f;
306         f->setfd(fd);
307         start(source, fd_dest);
308 }
309
310 int eFilePushThread::start(const char *file, int fd_dest)
311 {
312         eRawFile *f = new eRawFile();
313         ePtr<iTsSource> source = f;
314         if (f->open(file) < 0)
315                 return -1;
316         start(source, fd_dest);
317         return 0;
318 }
319
320 void eFilePushThread::start(ePtr<iTsSource> &source, int fd_dest)
321 {
322         m_source = source;
323         m_fd_dest = fd_dest;
324         m_current_position = 0;
325         resume();
326 }
327
328 void eFilePushThread::stop()
329 {
330                 /* if we aren't running, don't bother stopping. */
331         if (!sync())
332                 return;
333
334         m_stop = 1;
335
336         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
337         sendSignal(SIGUSR1);
338         kill(0);
339 }
340
341 void eFilePushThread::pause()
342 {
343         stop();
344 }
345
346 void eFilePushThread::resume()
347 {
348         m_stop = 0;
349         run();
350 }
351
352 void eFilePushThread::flush()
353 {
354         m_buf_start = m_buf_end = m_filter_end = 0;
355 }
356
357 void eFilePushThread::enablePVRCommit(int s)
358 {
359         m_send_pvr_commit = s;
360 }
361
362 void eFilePushThread::setStreamMode(int s)
363 {
364         m_stream_mode = s;
365 }
366
367 void eFilePushThread::setTimeshift(bool s)
368 {
369         m_is_timeshift = s;
370 }
371
372 void eFilePushThread::setTSPath(const std::string s)
373 {
374         m_tspath = s;
375 }
376
377 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
378 {
379         m_sg = sg;
380 }
381
382 void eFilePushThread::sendEvent(int evt)
383 {
384         m_messagepump.send(evt);
385 }
386
387 void eFilePushThread::recvEvent(const int &evt)
388 {
389         m_event(evt);
390 }
391
392 void eFilePushThread::defaultTSPath(bool s)
393 {
394         std::string tspath;
395
396         if (s) {
397                 if (ePythonConfigQuery::getConfigValue("config.usage.timeshift_path", tspath) == -1)
398                         eDebug("could not query ts path from config");
399         } else {
400                 if (ePythonConfigQuery::getConfigValue("config.usage.instantrec_path", tspath) == -1) {
401                         eDebug("could not query ts path from config");
402                 } else {
403                         if (tspath == "<default>") {
404                                 if (ePythonConfigQuery::getConfigValue("config.usage.default_path", tspath) == -1)
405                                         eDebug("could not query ts path from config");
406                         } else if (tspath == "<current>") {
407                                 if (ePythonConfigQuery::getConfigValue("config.movielist.last_videodir", tspath) == -1)
408                                         eDebug("could not query ts path from config");
409                         } else if (tspath == "<timer>") {
410                                 if (ePythonConfigQuery::getConfigValue("config.movielist.last_timer_videodir", tspath) == -1)
411                                         eDebug("could not query ts path from config");
412                         }
413                 }
414         }
415
416         if (!tspath.empty())
417                 tspath.append("/");
418
419         m_tspath = tspath;
420 }
421
422 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
423 {
424         return len;
425 }