filepush.cpp: remove no more needed code.. this fixes i.e. timeshift to live transiti...
[vuplus_dvbapp] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0, source_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         eDebug("FILEPUSH THREAD START");
41         
42                 /* we set the signal to not restart syscalls, so we can detect our signal. */
43         struct sigaction act;
44         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
45         act.sa_flags = 0;
46         sigaction(SIGUSR1, &act, 0);
47         
48         hasStarted();
49         
50         source_pos = m_raw_source.lseek(0, SEEK_CUR);
51         
52                 /* m_stop must be evaluated after each syscall. */
53         while (!m_stop)
54         {
55                         /* first try flushing the bufptr */
56                 if (m_buf_start != m_buf_end)
57                 {
58                                 /* filterRecordData wants to work on multiples of blocksize.
59                                    if it returns a negative result, it means that this many bytes should be skipped
60                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
61                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
62                                    Skipped bytes will also not be output.
63
64                                    if it returns a positive result, that means that only these many bytes should be used
65                                    in the buffer. 
66                                    
67                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
68                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
69                                    skip some data). This is probably a very special application for fast-forward, where the current
70                                    span is to be cancelled after a complete iframe has been output.
71
72                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
73                                    
74                                    we filter data only once, of course, but it might not get immediately written.
75                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
76                                 */
77                         
78                         int filter_res;
79                         
80                         do
81                         {
82                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
83
84                                 if (filter_res < 0)
85                                 {
86                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
87                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
88                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
89                                         continue; /* try again */
90                                 }
91                                 
92                                         /* adjust end of buffer to strip dropped tail bytes */
93                                 m_buf_end = m_filter_end + filter_res;
94                                         /* mark data as filtered. */
95                                 m_filter_end = m_buf_end;
96                         } while (0);
97                         
98                         ASSERT(m_filter_end == m_buf_end);
99                         
100                         if (m_buf_start == m_buf_end)
101                                 continue;
102
103                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
104                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
105                                 */
106                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
107 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
108 //                      eDebug("wrote %d bytes", w);
109                         if (w <= 0)
110                         {
111                                 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
112                                         continue;
113                                 eDebug("eFilePushThread WRITE ERROR");
114                                 sendEvent(evtWriteError);
115                                 break;
116                                 // ... we would stop the thread
117                         }
118
119                         written_since_last_sync += w;
120
121                         if (written_since_last_sync >= 512*1024)
122                         {
123                                 int toflush = written_since_last_sync > 2*1024*1024 ?
124                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
125                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
126                                 dest_pos -= toflush;
127                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
128                                 written_since_last_sync -= toflush;
129                         }
130
131 //                      printf("FILEPUSH: wrote %d bytes\n", w);
132                         m_buf_start += w;
133                         continue;
134                 }
135
136                         /* now fill our buffer. */
137                         
138                 if (m_sg && !current_span_remaining)
139                 {
140                         m_sg->getNextSourceSpan(source_pos, bytes_read, current_span_offset, current_span_remaining);
141                         ASSERT(!(current_span_remaining % m_blocksize));
142
143                         if (source_pos != current_span_offset)
144                                 source_pos = m_raw_source.lseek(current_span_offset, SEEK_SET);
145                         bytes_read = 0;
146                 }
147                 
148                 size_t maxread = sizeof(m_buffer);
149                 
150                         /* if we have a source span, don't read past the end */
151                 if (m_sg && maxread > current_span_remaining)
152                         maxread = current_span_remaining;
153
154                         /* align to blocksize */
155                 maxread -= maxread % m_blocksize;
156
157                 m_buf_start = 0;
158                 m_filter_end = 0;
159                 m_buf_end = 0;
160                 
161                 if (maxread)
162                         m_buf_end = m_raw_source.read(m_buffer, maxread);
163
164                 if (m_buf_end < 0)
165                 {
166                         m_buf_end = 0;
167                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
168                                 continue;
169                         if (errno == EOVERFLOW)
170                         {
171                                 eWarning("OVERFLOW while recording");
172                                 continue;
173                         }
174                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
175                 }
176
177                         /* a read might be mis-aligned in case of a short read. */
178                 int d = m_buf_end % m_blocksize;
179                 if (d)
180                 {
181                         m_raw_source.lseek(-d, SEEK_CUR);
182                         m_buf_end -= d;
183                 }
184
185                 if (m_buf_end == 0)
186                 {
187                                 /* on EOF, try COMMITting once. */
188                         if (m_send_pvr_commit)
189                         {
190                                 eDebug("sending PVR commit");
191                                 struct pollfd pfd;
192                                 pfd.fd = m_fd_dest;
193                                 pfd.events = POLLIN;
194                                 poll(&pfd, 1, -1);
195                                 eDebug("commit done");
196                         }
197                         
198                                 /* in stream_mode, we are sending EOF events 
199                                    over and over until somebody responds.
200                                    
201                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
202                         sendEvent(evtEOF);
203
204                         if (m_stream_mode)
205                         {
206                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
207                                 sleep(1);
208                                 continue;
209                         }
210 #if 0
211                         eDebug("FILEPUSH: end-of-file! (currently unhandled)");
212                         if (!m_raw_source.lseek(0, SEEK_SET))
213                         {
214                                 eDebug("(looping)");
215                                 continue;
216                         }
217 #endif
218                         break;
219                 } else
220                 {
221                         source_pos += m_buf_end;
222                         bytes_read += m_buf_end;
223                         if (m_sg)
224                                 current_span_remaining -= m_buf_end;
225                 }
226 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
227         }
228         fdatasync(m_fd_dest);
229
230         eDebug("FILEPUSH THREAD STOP");
231 }
232
233 void eFilePushThread::start(int fd_source, int fd_dest)
234 {
235         m_raw_source.setfd(fd_source);
236         m_fd_dest = fd_dest;
237         resume();
238 }
239
240 int eFilePushThread::start(const char *filename, int fd_dest)
241 {
242         if (m_raw_source.open(filename) < 0)
243                 return -1;
244         m_fd_dest = fd_dest;
245         resume();
246         return 0;
247 }
248
249 void eFilePushThread::stop()
250 {
251                 /* if we aren't running, don't bother stopping. */
252         if (!sync())
253                 return;
254
255         m_stop = 1;
256
257         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
258         sendSignal(SIGUSR1);
259         kill(0);
260 }
261
262 void eFilePushThread::pause()
263 {
264         stop();
265 }
266
267 void eFilePushThread::seek(int whence, off_t where)
268 {
269         m_raw_source.lseek(where, whence);
270 }
271
272 void eFilePushThread::resume()
273 {
274         m_stop = 0;
275         run();
276 }
277
278 void eFilePushThread::flush()
279 {
280         m_buf_start = m_buf_end = m_filter_end = 0;
281 }
282
283 void eFilePushThread::enablePVRCommit(int s)
284 {
285         m_send_pvr_commit = s;
286 }
287
288 void eFilePushThread::setStreamMode(int s)
289 {
290         m_stream_mode = s;
291 }
292
293 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
294 {
295         m_sg = sg;
296 }
297
298 void eFilePushThread::sendEvent(int evt)
299 {
300         m_messagepump.send(evt);
301 }
302
303 void eFilePushThread::recvEvent(const int &evt)
304 {
305         m_event(evt);
306 }
307
308 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
309 {
310         return len;
311 }