use POLLPRI instead of POLLHUP
[vuplus_dvbapp] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         int already_empty = 0;
41         eDebug("FILEPUSH THREAD START");
42         
43                 /* we set the signal to not restart syscalls, so we can detect our signal. */
44         struct sigaction act;
45         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46         act.sa_flags = 0;
47         sigaction(SIGUSR1, &act, 0);
48         
49         hasStarted();
50         
51         source_pos = m_raw_source.lseek(0, SEEK_CUR);
52         
53                 /* m_stop must be evaluated after each syscall. */
54         while (!m_stop)
55         {
56                         /* first try flushing the bufptr */
57                 if (m_buf_start != m_buf_end)
58                 {
59                                 /* filterRecordData wants to work on multiples of blocksize.
60                                    if it returns a negative result, it means that this many bytes should be skipped
61                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
62                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
63                                    Skipped bytes will also not be output.
64
65                                    if it returns a positive result, that means that only these many bytes should be used
66                                    in the buffer. 
67                                    
68                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
69                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
70                                    skip some data). This is probably a very special application for fast-forward, where the current
71                                    span is to be cancelled after a complete iframe has been output.
72
73                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
74                                    
75                                    we filter data only once, of course, but it might not get immediately written.
76                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
77                                 */
78                         
79                         int filter_res;
80                         
81                         do
82                         {
83                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
84
85                                 if (filter_res < 0)
86                                 {
87                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
88                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
89                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
90                                         continue; /* try again */
91                                 }
92                                 
93                                         /* adjust end of buffer to strip dropped tail bytes */
94                                 m_buf_end = m_filter_end + filter_res;
95                                         /* mark data as filtered. */
96                                 m_filter_end = m_buf_end;
97                         } while (0);
98                         
99                         ASSERT(m_filter_end == m_buf_end);
100                         
101                         if (m_buf_start == m_buf_end)
102                                 continue;
103
104                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
105                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
106                                 */
107                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
108 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
109 //                      eDebug("wrote %d bytes", w);
110                         if (w <= 0)
111                         {
112                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
113                                         continue;
114                                 eDebug("eFilePushThread WRITE ERROR");
115                                 sendEvent(evtWriteError);
116                                 break;
117                                 // ... we would stop the thread
118                         }
119
120                         written_since_last_sync += w;
121
122                         if (written_since_last_sync >= 512*1024)
123                         {
124                                 int toflush = written_since_last_sync > 2*1024*1024 ?
125                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
126                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
127                                 dest_pos -= toflush;
128                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
129                                 written_since_last_sync -= toflush;
130                         }
131
132 //                      printf("FILEPUSH: wrote %d bytes\n", w);
133                         m_buf_start += w;
134                         continue;
135                 }
136
137                         /* now fill our buffer. */
138                         
139                 if (m_sg && !current_span_remaining)
140                 {
141                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
142                         ASSERT(!(current_span_remaining % m_blocksize));
143
144                         if (source_pos != current_span_offset)
145                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
146                         bytes_read = 0;
147                 }
148                 
149                 size_t maxread = sizeof(m_buffer);
150                 
151                         /* if we have a source span, don't read past the end */
152                 if (m_sg && maxread > current_span_remaining)
153                         maxread = current_span_remaining;
154
155                         /* align to blocksize */
156                 maxread -= maxread % m_blocksize;
157
158                 m_buf_start = 0;
159                 m_filter_end = 0;
160                 m_buf_end = 0;
161                 
162                 if (maxread)
163                         m_buf_end = m_raw_source.read(m_buffer, maxread);
164
165                 if (m_buf_end < 0)
166                 {
167                         m_buf_end = 0;
168                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
169                                 continue;
170                         if (errno == EOVERFLOW)
171                         {
172                                 eWarning("OVERFLOW while recording");
173                                 continue;
174                         }
175                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
176                 }
177
178                         /* a read might be mis-aligned in case of a short read. */
179                 int d = m_buf_end % m_blocksize;
180                 if (d)
181                 {
182                         m_raw_source.lseek(-d, SEEK_CUR);
183                         m_buf_end -= d;
184                 }
185
186                 if (m_buf_end == 0)
187                 {
188                                 /* on EOF, try COMMITting once. */
189                         if (m_send_pvr_commit && !already_empty)
190                         {
191                                 eDebug("sending PVR commit");
192                                 
193                                 struct pollfd pfd;
194                                 pfd.fd = m_fd_dest;
195                                 pfd.events = POLLPRI;
196                                 poll(&pfd, 1, 10000);
197                                 sleep(5); /* HACK to allow ES buffer to drain */
198                                 already_empty = 1;
199 //                              if (::ioctl(m_fd_dest, PVR_COMMIT) < 0 && errno == EINTR)
200 //                                      continue;
201                                 eDebug("commit done");
202                                                 /* well check again */
203                                 continue;
204                         }
205                         
206                                 /* in stream_mode, we are sending EOF events 
207                                    over and over until somebody responds.
208                                    
209                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
210                         sendEvent(evtEOF);
211
212                         if (m_stream_mode)
213                         {
214                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
215                                 sleep(1);
216                                 continue;
217                         }
218 #if 0
219                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
220                         if (!m_raw_source.lseek(0, SEEK_SET))
221                         {
222                                 eDebug("(looping)");
223                                 continue;
224                         }
225 #endif
226                         break;
227                 } else
228                 {
229                         source_pos += m_buf_end;
230                         bytes_read += m_buf_end;
231                         if (m_sg)
232                                 current_span_remaining -= m_buf_end;
233                         already_empty = 0;
234                 }
235 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
236         }
237         fdatasync(m_fd_dest);
238
239         eDebug("FILEPUSH THREAD STOP");
240 }
241
242 void eFilePushThread::start(int fd_source, int fd_dest)
243 {
244         m_raw_source.setfd(fd_source);
245         m_fd_dest = fd_dest;
246         resume();
247 }
248
249 int eFilePushThread::start(const char *filename, int fd_dest)
250 {
251         if (m_raw_source.open(filename) < 0)
252                 return -1;
253         m_fd_dest = fd_dest;
254         resume();
255         return 0;
256 }
257
258 void eFilePushThread::stop()
259 {
260                 /* if we aren't running, don't bother stopping. */
261         if (!sync())
262                 return;
263
264         m_stop = 1;
265
266         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
267         sendSignal(SIGUSR1);
268         kill(0);
269 }
270
271 void eFilePushThread::pause()
272 {
273         stop();
274 }
275
276 void eFilePushThread::seek(int whence, off_t where)
277 {
278         m_raw_source.lseek(where, whence);
279 }
280
281 void eFilePushThread::resume()
282 {
283         m_stop = 0;
284         run();
285 }
286
287 void eFilePushThread::flush()
288 {
289         m_buf_start = m_buf_end = m_filter_end = 0;
290 }
291
292 void eFilePushThread::enablePVRCommit(int s)
293 {
294         m_send_pvr_commit = s;
295 }
296
297 void eFilePushThread::setStreamMode(int s)
298 {
299         m_stream_mode = s;
300 }
301
302 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
303 {
304         m_sg = sg;
305 }
306
307 void eFilePushThread::sendEvent(int evt)
308 {
309         m_messagepump.send(evt);
310 }
311
312 void eFilePushThread::recvEvent(const int &evt)
313 {
314         m_event(evt);
315 }
316
317 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
318 {
319         return len;
320 }