filepush.h/cpp: migrate to iDataSource
[vuplus_dvbapp] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         eDebug("FILEPUSH THREAD START");
41         
42                 /* we set the signal to not restart syscalls, so we can detect our signal. */
43         struct sigaction act;
44         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
45         act.sa_flags = 0;
46         sigaction(SIGUSR1, &act, 0);
47         
48         hasStarted();
49         
50         source_pos = m_raw_source->lseek(0, SEEK_CUR);
51         
52                 /* m_stop must be evaluated after each syscall. */
53         while (!m_stop)
54         {
55                         /* first try flushing the bufptr */
56                 if (m_buf_start != m_buf_end)
57                 {
58                                 /* filterRecordData wants to work on multiples of blocksize.
59                                    if it returns a negative result, it means that this many bytes should be skipped
60                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
61                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
62                                    Skipped bytes will also not be output.
63
64                                    if it returns a positive result, that means that only these many bytes should be used
65                                    in the buffer. 
66                                    
67                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
68                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
69                                    skip some data). This is probably a very special application for fast-forward, where the current
70                                    span is to be cancelled after a complete iframe has been output.
71
72                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
73                                    
74                                    we filter data only once, of course, but it might not get immediately written.
75                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
76                                 */
77                         
78                         int filter_res;
79                         
80                         do
81                         {
82                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
83
84                                 if (filter_res < 0)
85                                 {
86                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
87                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
88                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
89                                         continue; /* try again */
90                                 }
91                                 
92                                         /* adjust end of buffer to strip dropped tail bytes */
93                                 m_buf_end = m_filter_end + filter_res;
94                                         /* mark data as filtered. */
95                                 m_filter_end = m_buf_end;
96                         } while (0);
97                         
98                         ASSERT(m_filter_end == m_buf_end);
99                         
100                         if (m_buf_start == m_buf_end)
101                                 continue;
102
103                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
104                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
105                                 */
106                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
107 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
108 //                      eDebug("wrote %d bytes", w);
109                         if (w <= 0)
110                         {
111                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
112                                         continue;
113                                 eDebug("eFilePushThread WRITE ERROR");
114                                 sendEvent(evtWriteError);
115                                 break;
116                                 // ... we would stop the thread
117                         }
118
119                         written_since_last_sync += w;
120
121                         if (written_since_last_sync >= 512*1024)
122                         {
123                                 int toflush = written_since_last_sync > 2*1024*1024 ?
124                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
125                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
126                                 dest_pos -= toflush;
127                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
128                                 written_since_last_sync -= toflush;
129                         }
130
131 //                      printf("FILEPUSH: wrote %d bytes\n", w);
132                         m_buf_start += w;
133                         continue;
134                 }
135
136                         /* now fill our buffer. */
137                         
138                 if (m_sg && !current_span_remaining)
139                 {
140                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
141                         ASSERT(!(current_span_remaining % m_blocksize));
142
143                         if (source_pos != current_span_offset)
144                                 source_pos = m_raw_source->lseek(current_span_offset, SEEK_SET);
145                         bytes_read = 0;
146                 }
147                 
148                 size_t maxread = sizeof(m_buffer);
149                 
150                         /* if we have a source span, don't read past the end */
151                 if (m_sg && maxread > current_span_remaining)
152                         maxread = current_span_remaining;
153
154                         /* align to blocksize */
155                 maxread -= maxread % m_blocksize;
156
157                 m_buf_start = 0;
158                 m_filter_end = 0;
159                 m_buf_end = 0;
160                 
161                 if (maxread)
162                         m_buf_end = m_raw_source->read(m_buffer, maxread);
163
164                 if (m_buf_end < 0)
165                 {
166                         m_buf_end = 0;
167                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
168                                 continue;
169                         if (errno == EOVERFLOW)
170                         {
171                                 eWarning("OVERFLOW while recording");
172                                 continue;
173                         }
174                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
175                 }
176
177                         /* a read might be mis-aligned in case of a short read. */
178                 int d = m_buf_end % m_blocksize;
179                 if (d)
180                 {
181                         m_raw_source->lseek(-d, SEEK_CUR);
182                         m_buf_end -= d;
183                 }
184
185                 if (m_buf_end == 0)
186                 {
187                                 /* on EOF, try COMMITting once. */
188                         if (m_send_pvr_commit)
189                         {
190                                 struct pollfd pfd;
191                                 pfd.fd = m_fd_dest;
192                                 pfd.events = POLLIN;
193                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
194                                 {
195                                         case 0:
196                                                 eDebug("wait for driver eof timeout");
197                                                 continue;
198                                         case 1:
199                                                 eDebug("wait for driver eof ok");
200                                                 break;
201                                         default:
202                                                 eDebug("wait for driver eof aborted by signal");
203                                                 continue;
204                                 }
205                         }
206                         
207                                 /* in stream_mode, we are sending EOF events 
208                                    over and over until somebody responds.
209                                    
210                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
211                         sendEvent(evtEOF);
212
213                         if (m_stream_mode)
214                         {
215                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
216                                 sleep(1);
217                                 continue;
218                         }
219 #if 0
220                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
221                         if (!m_raw_source->lseek(0, SEEK_SET))
222                         {
223                                 eDebug("(looping)");
224                                 continue;
225                         }
226 #endif
227                         break;
228                 } else
229                 {
230                         source_pos += m_buf_end;
231                         bytes_read += m_buf_end;
232                         if (m_sg)
233                                 current_span_remaining -= m_buf_end;
234                 }
235 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
236         }
237         fdatasync(m_fd_dest);
238
239         eDebug("FILEPUSH THREAD STOP");
240 }
241
242 void eFilePushThread::start(int fd, int fd_dest)
243 {
244         eRawFile *f = new eRawFile();
245         f->setfd(fd);
246         m_raw_source = f;
247         m_fd_dest = fd_dest;
248         resume();
249 }
250
251 int eFilePushThread::start(const char *file, int fd_dest)
252 {
253         eRawFile *f = new eRawFile();
254         ePtr<iDataSource> source = f;
255         if (f->open(file) < 0)
256                 return -1;
257         start(source, fd_dest);
258         return 0;
259 }
260
261 void eFilePushThread::start(ePtr<iDataSource> source, int fd_dest)
262 {
263         m_raw_source = source;
264         m_fd_dest = fd_dest;
265         resume();
266 }
267
268 void eFilePushThread::stop()
269 {
270                 /* if we aren't running, don't bother stopping. */
271         if (!sync())
272                 return;
273
274         m_stop = 1;
275
276         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
277         sendSignal(SIGUSR1);
278         kill(0);
279 }
280
281 void eFilePushThread::pause()
282 {
283         stop();
284 }
285
286 void eFilePushThread::seek(int whence, off_t where)
287 {
288         m_raw_source->lseek(where, whence);
289 }
290
291 void eFilePushThread::resume()
292 {
293         m_stop = 0;
294         run();
295 }
296
297 void eFilePushThread::flush()
298 {
299         m_buf_start = m_buf_end = m_filter_end = 0;
300 }
301
302 void eFilePushThread::enablePVRCommit(int s)
303 {
304         m_send_pvr_commit = s;
305 }
306
307 void eFilePushThread::setStreamMode(int s)
308 {
309         m_stream_mode = s;
310 }
311
312 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
313 {
314         m_sg = sg;
315 }
316
317 void eFilePushThread::sendEvent(int evt)
318 {
319         m_messagepump.send(evt);
320 }
321
322 void eFilePushThread::recvEvent(const int &evt)
323 {
324         m_event(evt);
325 }
326
327 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
328 {
329         return len;
330 }