1 #include <lib/base/filepush.h>
2 #include <lib/base/eerror.h>
9 //FILE *f = fopen("/log.ts", "wb");
11 eFilePushThread::eFilePushThread(int io_prio_class, int io_prio_level, int blocksize)
12 :prio_class(io_prio_class), prio(io_prio_level), m_messagepump(eApp, 0)
16 m_send_pvr_commit = 0;
18 m_blocksize = blocksize;
21 CONNECT(m_messagepump.recv_msg, eFilePushThread::recvEvent);
24 static void signal_handler(int x)
28 void eFilePushThread::thread()
30 setIoPrio(prio_class, prio);
33 size_t bytes_read = 0;
35 off_t current_span_offset = 0;
36 size_t current_span_remaining = 0;
38 size_t written_since_last_sync = 0;
40 eDebug("FILEPUSH THREAD START");
42 /* we set the signal to not restart syscalls, so we can detect our signal. */
44 act.sa_handler = signal_handler; // no, SIG_IGN doesn't do it. we want to receive the -EINTR
46 sigaction(SIGUSR1, &act, 0);
50 /* m_stop must be evaluated after each syscall. */
53 /* first try flushing the bufptr */
54 if (m_buf_start != m_buf_end)
56 /* filterRecordData wants to work on multiples of blocksize.
57 if it returns a negative result, it means that this many bytes should be skipped
58 *in front* of the buffer. Then, it will be called again. with the newer, shorter buffer.
59 if filterRecordData wants to skip more data then currently available, it must do that internally.
60 Skipped bytes will also not be output.
62 if it returns a positive result, that means that only these many bytes should be used
65 In either case, current_span_remaining is given as a reference and can be modified. (Of course it
66 doesn't make sense to decrement it to a non-zero value unless you return 0 because that would just
67 skip some data). This is probably a very special application for fast-forward, where the current
68 span is to be cancelled after a complete iframe has been output.
70 we always call filterRecordData with our full buffer (otherwise we couldn't easily strip from the end)
72 we filter data only once, of course, but it might not get immediately written.
73 that's what m_filter_end is for - it points to the start of the unfiltered data.
80 filter_res = filterRecordData(m_buffer + m_filter_end, m_buf_end - m_filter_end, current_span_remaining);
84 eDebug("[eFilePushThread] filterRecordData re-syncs and skips %d bytes", -filter_res);
85 m_buf_start = m_filter_end + -filter_res; /* this will also drop unwritten data */
86 ASSERT(m_buf_start <= m_buf_end); /* otherwise filterRecordData skipped more data than available. */
87 continue; /* try again */
90 /* adjust end of buffer to strip dropped tail bytes */
91 m_buf_end = m_filter_end + filter_res;
92 /* mark data as filtered. */
93 m_filter_end = m_buf_end;
96 ASSERT(m_filter_end == m_buf_end);
98 if (m_buf_start == m_buf_end)
101 /* now write out data. it will be 'aligned' (according to filterRecordData).
102 absolutely forbidden is to return EINTR and consume a non-aligned number of bytes.
104 int w = write(m_fd_dest, m_buffer + m_buf_start, m_buf_end - m_buf_start);
105 // fwrite(m_buffer + m_buf_start, 1, m_buf_end - m_buf_start, f);
106 // eDebug("wrote %d bytes", w);
109 if (errno == EINTR || errno == EAGAIN || errno == EBUSY)
111 eDebug("eFilePushThread WRITE ERROR");
112 sendEvent(evtWriteError);
114 // ... we would stop the thread
117 written_since_last_sync += w;
119 if (written_since_last_sync >= 512*1024)
121 int toflush = written_since_last_sync > 2*1024*1024 ?
122 2*1024*1024 : written_since_last_sync &~ 4095; // write max 2MB at once
123 dest_pos = lseek(m_fd_dest, 0, SEEK_CUR);
125 posix_fadvise(m_fd_dest, dest_pos, toflush, POSIX_FADV_DONTNEED);
126 written_since_last_sync -= toflush;
129 // printf("FILEPUSH: wrote %d bytes\n", w);
134 /* now fill our buffer. */
136 if (m_sg && !current_span_remaining)
138 m_sg->getNextSourceSpan(m_current_position, bytes_read, current_span_offset, current_span_remaining);
139 ASSERT(!(current_span_remaining % m_blocksize));
140 m_current_position = current_span_offset;
144 size_t maxread = sizeof(m_buffer);
146 /* if we have a source span, don't read past the end */
147 if (m_sg && maxread > current_span_remaining)
148 maxread = current_span_remaining;
150 /* align to blocksize */
151 maxread -= maxread % m_blocksize;
158 m_buf_end = m_source->read(m_current_position, m_buffer, maxread);
163 if (errno == EINTR || errno == EBUSY || errno == EAGAIN)
165 if (errno == EOVERFLOW)
167 eWarning("OVERFLOW while recording");
170 eDebug("eFilePushThread *read error* (%m) - not yet handled");
173 /* a read might be mis-aligned in case of a short read. */
174 int d = m_buf_end % m_blocksize;
180 /* on EOF, try COMMITting once. */
181 if (m_send_pvr_commit)
186 switch (poll(&pfd, 1, 250)) // wait for 250ms
189 eDebug("wait for driver eof timeout");
192 eDebug("wait for driver eof ok");
195 eDebug("wait for driver eof aborted by signal");
200 /* in stream_mode, we are sending EOF events
201 over and over until somebody responds.
203 in stream_mode, think of evtEOF as "buffer underrun occured". */
208 eDebug("reached EOF, but we are in stream mode. delaying 1 second.");
215 m_current_position += m_buf_end;
216 bytes_read += m_buf_end;
218 current_span_remaining -= m_buf_end;
220 // printf("FILEPUSH: read %d bytes\n", m_buf_end);
222 fdatasync(m_fd_dest);
224 eDebug("FILEPUSH THREAD STOP");
227 void eFilePushThread::start(int fd, int fd_dest)
229 eRawFile *f = new eRawFile();
230 ePtr<iTsSource> source = f;
232 start(source, fd_dest);
235 int eFilePushThread::start(const char *file, int fd_dest)
237 eRawFile *f = new eRawFile();
238 ePtr<iTsSource> source = f;
239 if (f->open(file) < 0)
241 start(source, fd_dest);
245 void eFilePushThread::start(ePtr<iTsSource> &source, int fd_dest)
249 m_current_position = 0;
253 void eFilePushThread::stop()
255 /* if we aren't running, don't bother stopping. */
261 eDebug("stopping thread."); /* just do it ONCE. it won't help to do this more than once. */
266 void eFilePushThread::pause()
271 void eFilePushThread::resume()
277 void eFilePushThread::flush()
279 m_buf_start = m_buf_end = m_filter_end = 0;
282 void eFilePushThread::enablePVRCommit(int s)
284 m_send_pvr_commit = s;
287 void eFilePushThread::setStreamMode(int s)
292 void eFilePushThread::setScatterGather(iFilePushScatterGather *sg)
297 void eFilePushThread::sendEvent(int evt)
299 m_messagepump.send(evt);
302 void eFilePushThread::recvEvent(const int &evt)
307 int eFilePushThread::filterRecordData(const unsigned char *data, int len, size_t ¤t_span_remaining)