4 * Created on: 2014. 6. 10.
28 //----------------------------------------------------------------------
30 #define RESPONSE_FD (1)
31 #define BUFFFER_SIZE (188 * 256)
33 void signal_handler(int sig_no);
34 void do_exit(const char *message);
36 static bool is_terminated = true;
37 //----------------------------------------------------------------------
39 void *streaming_thread_main(void *params)
41 if (is_terminated) return 0;
43 INFO("streaming thread start.");
44 Encoder *encoder = ((ThreadParams*) params)->encoder;
45 RequestHeader *header = ((ThreadParams*) params)->request;
48 int poll_state, rc, wc;
49 struct pollfd poll_fd[2];
50 unsigned char buffer[BUFFFER_SIZE];
52 poll_fd[0].fd = encoder->get_fd();
53 poll_fd[0].events = POLLIN | POLLHUP;
55 while(!is_terminated) {
56 poll_state = poll(poll_fd, 1, 1000);
57 if (poll_state == -1) {
58 throw(trap("poll error."));
60 else if (poll_state == 0) {
63 if (poll_fd[0].revents & POLLIN) {
65 rc = read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1);
70 wc = write(RESPONSE_FD, buffer, rc);
71 //DEBUG("write : %d", wc);
73 //DEBUG("need rewrite.. remain (%d)", rc - wc);
75 for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
76 poll_fd[0].revents = 0;
78 retry_wc = write(RESPONSE_FD, (buffer + rc - remain_len), remain_len);
81 LOG("re-write result : %d - %d", wc, rc);
85 else if (poll_fd[0].revents & POLLHUP)
87 if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
88 DEBUG("stop transcoding..");
89 encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
95 catch (const trap &e) {
96 ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
99 INFO("streaming thread stop.");
101 if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
102 DEBUG("stop transcoding..");
103 encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
110 //----------------------------------------------------------------------
112 void *source_thread_main(void *params)
114 Source *source = ((ThreadParams*) params)->source;
115 Encoder *encoder = ((ThreadParams*) params)->encoder;
116 RequestHeader *header = ((ThreadParams*) params)->request;
118 INFO("source thread start.");
121 int poll_state, rc, wc;
122 struct pollfd poll_fd[2];
123 unsigned char buffer[BUFFFER_SIZE];
125 poll_fd[0].fd = encoder->get_fd();
126 poll_fd[0].events = POLLOUT;
128 poll_fd[1].fd = source->get_fd();
129 poll_fd[1].events = POLLIN;
131 while(!is_terminated) {
132 poll_state = poll(poll_fd, 2, 1000);
133 if (poll_state == -1) {
134 throw(trap("poll error."));
136 else if (poll_state == 0) {
140 if (poll_fd[0].revents & POLLOUT) {
142 if (poll_fd[1].revents & POLLIN) {
143 rc = read(source->get_fd(), buffer, BUFFFER_SIZE - 1);
148 wc = write(encoder->get_fd(), buffer, rc);
149 //DEBUG("write : %d", wc);
151 //DEBUG("need rewrite.. remain (%d)", rc - wc);
153 for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
154 poll_fd[0].revents = 0;
156 poll_state = poll(poll_fd, 1, 1000);
157 if (poll_fd[0].revents & POLLOUT) {
158 retry_wc = write(encoder->get_fd(), (buffer + rc - remain_len), remain_len);
162 LOG("re-write result : %d - %d", wc, rc);
170 catch (const trap &e) {
171 ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
173 INFO("source thread stop.");
179 //----------------------------------------------------------------------
181 int main(int argc, char **argv)
183 if (access("/tmp/.debug_on", F_OK) == 0) {
184 Logger::instance()->init("/tmp/transtreamproxy", Logger::DEBUG, false, "3.0");
187 Logger::instance()->init("/tmp/transtreamproxy", Logger::WARNING, false, "3.0");
189 signal(SIGINT, signal_handler);
191 RequestHeader header;
193 int source_thread_id, stream_thread_id;
194 pthread_t source_thread_handle, stream_thread_handle;
196 std::string req = read_request();
198 DEBUG("request head :\n%s", req.c_str());
199 if (header.parse_header(req)) {
202 ThreadParams thread_params = { 0, &encoder, &header };
204 int video_pid = 0, audio_pid = 0, pmt_pid = 0;
206 switch(header.type) {
207 case REQ_TYPE_TRANSCODING_FILE:
209 MpegTS *ts = new MpegTS(header.extension["file"], true);
210 pmt_pid = ts->pmt_pid;
211 video_pid = ts->video_pid;
212 audio_pid = ts->audio_pid;
215 catch (const trap &e) {
216 ERROR("fail to create source : %s", e.what());
220 case REQ_TYPE_TRANSCODING_LIVE:
222 Demuxer *dmx = new Demuxer(&header);
223 pmt_pid = dmx->pmt_pid;
224 video_pid = dmx->video_pid;
225 audio_pid = dmx->audio_pid;
228 catch (const trap &e) {
229 ERROR("fail to create source : %s", e.what());
234 ERROR("not support source type (type : %d)", header.type);
237 thread_params.source = source;
239 if (!encoder.retry_open(2, 3)) {
243 if (encoder.state == Encoder::ENCODER_STAT_OPENED) {
244 std::string response;
245 off_t byte_offset = 0;
246 if ((byte_offset = make_response((ThreadParams*) &thread_params, response)) < 0) {
251 write(RESPONSE_FD, response.c_str(), response.length());
252 DEBUG("response data :\n%s", response.c_str());
254 if (header.type == REQ_TYPE_TRANSCODING_FILE) {
256 std::string position = header.extension["position"];
257 if (position == "") {
258 DEBUG("seek to byte_offset %llu", byte_offset);
259 ((MpegTS*)source)->seek_absolute(byte_offset);
263 unsigned int position_offset = strtollu(position);
264 if(((MpegTS*)source)->is_time_seekable && (position_offset > 0)) {
265 DEBUG("seek to position_offset %ds", position_offset);
266 ((MpegTS*)source)->seek_time((position_offset * 1000) + ((MpegTS*)source)->first_pcr_ms);
271 catch (const trap &e) {
272 WARNING("Exception : %s", e.what());
276 if (!encoder.ioctl(Encoder::IOCTL_SET_VPID, video_pid)) {
277 do_exit("fail to set video pid.");
280 if (!encoder.ioctl(Encoder::IOCTL_SET_APID, audio_pid)) {
281 do_exit("fail to set audio pid.");
284 if (!encoder.ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) {
285 do_exit("fail to set pmtid.");
290 is_terminated = false;
291 source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, (void *)&thread_params);
292 if (source_thread_id < 0) {
293 do_exit("fail to create source thread.");
296 pthread_detach(source_thread_handle);
298 if (!encoder.ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) {
299 do_exit("fail to start transcoding.");
302 stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, (void *)&thread_params);
303 if (stream_thread_id < 0) {
304 do_exit("fail to create stream thread.");
308 pthread_join(stream_thread_handle, 0);
317 //----------------------------------------------------------------------
319 void do_exit(const char *message)
321 is_terminated = true;
323 ERROR("%s", message);
326 //----------------------------------------------------------------------
328 void signal_handler(int sig_no)
330 INFO("signal no : %d", sig_no);
331 do_exit("signal detected..");
333 //----------------------------------------------------------------------