TranscodingSetup : fix misspelling name.
[vuplus_dvbapp] / lib / base / filepush.cpp
1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
3 #include <errno.h>
4 #include <fcntl.h>
5 #include <sys/ioctl.h>
6
7 #define PVR_COMMIT 1
8
9 //FILE *f = fopen("/log.ts", "wb");
10
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12         :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
13 {
14         m_stop = 0;
15         m_sg = 0;
16         m_send_pvr_commit = 0;
17         m_stream_mode = 0;
18         m_blocksize = blocksize;
19         flush();
20         enablePVRCommit(0);
21         CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
22 }
23
24 static void signal_handler(int x)
25 {
26 }
27
28 void eFilePushThread::thread()
29 {
30         setIoPrio(prio_class, prio);
31
32         off_t dest_pos = 0;
33         size_t bytes_read = 0;
34         
35         off_t current_span_offset = 0;
36         size_t current_span_remaining = 0;
37         
38         size_t written_since_last_sync = 0;
39
40         eDebug("FILEPUSH THREAD START");
41         
42                 /* we set the signal to not restart syscalls, so we can detect our signal. */
43         struct sigaction act;
44         act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
45         act.sa_flags = 0;
46         sigaction(SIGUSR1, &act, 0);
47         
48         hasStarted();
49
50                 /* m_stop must be evaluated after each syscall. */
51         while (!m_stop)
52         {
53                         /* first try flushing the bufptr */
54                 if (m_buf_start != m_buf_end)
55                 {
56                                 /* filterRecordData wants to work on multiples of blocksize.
57                                    if it returns a negative result, it means that this many bytes should be skipped
58                                    *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
59                                    if filterRecordData wants to skip more data then currently available, it must do that internally.
60                                    Skipped bytes will also not be output.
61
62                                    if it returns a positive result, that means that only these many bytes should be used
63                                    in the buffer. 
64                                    
65                                    In either case, current_span_remaining is given as a reference and can be modified. (Of course it 
66                                    doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
67                                    skip some data). This is probably a very special application for fast-forward, where the current
68                                    span is to be cancelled after a complete iframe has been output.
69
70                                    we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
71                                    
72                                    we filter data only once, of course, but it might not get immediately written.
73                                    that's what m_filter_end is for - it points to the start of the unfiltered data.
74                                 */
75                         
76                         int filter_res;
77                         
78                         do
79                         {
80                                 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
81
82                                 if (filter_res < 0)
83                                 {
84                                         eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
85                                         m_buf_start = m_filter_end + -filter_res;  /* this will also drop unwritten data */
86                                         ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
87                                         continue; /* try again */
88                                 }
89                                 
90                                         /* adjust end of buffer to strip dropped tail bytes */
91                                 m_buf_end = m_filter_end + filter_res;
92                                         /* mark data as filtered. */
93                                 m_filter_end = m_buf_end;
94                         } while (0);
95                         
96                         ASSERT(m_filter_end == m_buf_end);
97                         
98                         if (m_buf_start == m_buf_end)
99                                 continue;
100
101                                 /* now write out data. it will be 'aligned' (according to filterRecordData). 
102                                    absolutely forbidden is to return EINTR and consume a non-aligned number of bytes. 
103                                 */
104                         int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
105 //                      fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
106 //                      eDebug("wrote %d bytes", w);
107                         if (w <= 0)
108                         {
109                                 if (w < 0 && (errno == EINTR || errno == EAGAIN || errno == EBUSY))
110                                         continue;
111                                 eDebug("eFilePushThread WRITE ERROR");
112                                 sendEvent(evtWriteError);
113                                 break;
114                                 // ... we would stop the thread
115                         }
116
117                         written_since_last_sync += w;
118
119                         if (written_since_last_sync >= 512*1024)
120                         {
121                                 int toflush = written_since_last_sync > 2*1024*1024 ?
122                                         2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
123                                 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
124                                 dest_pos -= toflush;
125                                 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
126                                 written_since_last_sync -= toflush;
127                         }
128
129 //                      printf("FILEPUSH: wrote %d bytes\n", w);
130                         m_buf_start += w;
131                         continue;
132                 }
133
134                         /* now fill our buffer. */
135                         
136                 if (m_sg && !current_span_remaining)
137                 {
138                         m_sg->getNextSourceSpan(m_current_position, bytes_read, current_span_offset, current_span_remaining);
139                         ASSERT(!(current_span_remaining % m_blocksize));
140                         m_current_position = current_span_offset;
141                         bytes_read = 0;
142                 }
143
144                 size_t maxread = sizeof(m_buffer);
145                 
146                         /* if we have a source span, don't read past the end */
147                 if (m_sg && maxread > current_span_remaining)
148                         maxread = current_span_remaining;
149
150                         /* align to blocksize */
151                 maxread -= maxread % m_blocksize;
152
153                 m_buf_start = 0;
154                 m_filter_end = 0;
155                 m_buf_end = 0;
156
157                 if (maxread)
158                         m_buf_end = m_source->read(m_current_position, m_buffer, maxread);
159
160                 if (m_buf_end < 0)
161                 {
162                         m_buf_end = 0;
163                         if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
164                                 continue;
165                         if (errno == EOVERFLOW)
166                         {
167                                 eWarning("OVERFLOW while recording");
168                                 continue;
169                         }
170                         eDebug("eFilePushThread *read error* (%m) - not yet handled");
171                 }
172
173                         /* a read might be mis-aligned in case of a short read. */
174                 int d = m_buf_end % m_blocksize;
175                 if (d)
176                         m_buf_end -= d;
177
178                 if (m_buf_end == 0)
179                 {
180                                 /* on EOF, try COMMITting once. */
181                         if (m_send_pvr_commit)
182                         {
183                                 struct pollfd pfd;
184                                 pfd.fd = m_fd_dest;
185                                 pfd.events = POLLIN;
186                                 switch (poll(&pfd, 1, 250)) // wait for 250ms
187                                 {
188                                         case 0:
189                                                 eDebug("wait for driver eof timeout");
190                                                 continue;
191                                         case 1:
192                                                 eDebug("wait for driver eof ok");
193                                                 break;
194                                         default:
195                                                 eDebug("wait for driver eof aborted by signal");
196                                                 continue;
197                                 }
198                         }
199                         
200                                 /* in stream_mode, we are sending EOF events 
201                                    over and over until somebody responds.
202                                    
203                                    in stream_mode, think of evtEOF as "buffer underrun occured". */
204                         sendEvent(evtEOF);
205
206                         if (m_stream_mode)
207                         {
208                                 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
209                                 sleep(1);
210                                 continue;
211                         }
212                         break;
213                 } else
214                 {
215                         m_current_position += m_buf_end;
216                         bytes_read += m_buf_end;
217                         if (m_sg)
218                                 current_span_remaining -= m_buf_end;
219                 }
220 //              printf("FILEPUSH: read %d bytes\n", m_buf_end);
221         }
222         fdatasync(m_fd_dest);
223
224         eDebug("FILEPUSH THREAD STOP");
225 }
226
227 void eFilePushThread::start(int fd, int fd_dest)
228 {
229         eRawFile *f = new eRawFile();
230         ePtr<iTsSource> source = f;
231         f->setfd(fd);
232         start(source, fd_dest);
233 }
234
235 int eFilePushThread::start(const char *file, int fd_dest)
236 {
237         eRawFile *f = new eRawFile();
238         ePtr<iTsSource> source = f;
239         if (f->open(file) < 0)
240                 return -1;
241         start(source, fd_dest);
242         return 0;
243 }
244
245 void eFilePushThread::start(ePtr<iTsSource> &source, int fd_dest)
246 {
247         m_source = source;
248         m_fd_dest = fd_dest;
249         m_current_position = 0;
250         resume();
251 }
252
253 void eFilePushThread::stop()
254 {
255                 /* if we aren't running, don't bother stopping. */
256         if (!sync())
257                 return;
258
259         m_stop = 1;
260
261         eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
262         sendSignal(SIGUSR1);
263         kill(0);
264 }
265
266 void eFilePushThread::pause()
267 {
268         stop();
269 }
270
271 void eFilePushThread::resume()
272 {
273         m_stop = 0;
274         run();
275 }
276
277 void eFilePushThread::flush()
278 {
279         m_buf_start = m_buf_end = m_filter_end = 0;
280 }
281
282 void eFilePushThread::enablePVRCommit(int s)
283 {
284         m_send_pvr_commit = s;
285 }
286
287 void eFilePushThread::setStreamMode(int s)
288 {
289         m_stream_mode = s;
290 }
291
292 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
293 {
294         m_sg = sg;
295 }
296
297 void eFilePushThread::sendEvent(int evt)
298 {
299         m_messagepump.send(evt);
300 }
301
302 void eFilePushThread::recvEvent(const int &evt)
303 {
304         m_event(evt);
305 }
306
307 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t &current_span_remaining)
308 {
309         return len;
310 }