42bed62ac4abbcf9fce24f0aea2d8d5398923e0a
[vuplus_transtreamproxy] / src / main.cpp
1 /*
2  * main.cpp
3  *
4  *  Created on: 2014. 6. 10.
5  *      Author: oskwon
6  */
7
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <string.h>
11 #include <pthread.h>
12 #include <poll.h>
13 #include <errno.h>
14 #include <signal.h>
15
16 #include <string>
17
18 #include "trap.h"
19 #include "mpegts.h"
20
21 #include "Utils.h"
22 #include "Logger.h"
23
24 #include "Demuxer.h"
25 #include "Encoder.h"
26
27 using namespace std;
28 //----------------------------------------------------------------------
29
30 #define RESPONSE_FD  (1)
31 #define BUFFFER_SIZE (188 * 256)
32
33 void signal_handler(int sig_no);
34 void do_exit(const char *message);
35
36 static bool is_terminated = true;
37 //----------------------------------------------------------------------
38
39 void *streaming_thread_main(void *params)
40 {
41         if (is_terminated) return 0;
42
43         INFO("streaming thread start.");
44         Encoder *encoder = ((ThreadParams*) params)->encoder;
45         RequestHeader *header = ((ThreadParams*) params)->request;
46
47         try {
48                 int poll_state, rc, wc;
49                 struct pollfd poll_fd[2];
50                 unsigned char buffer[BUFFFER_SIZE];
51
52                 poll_fd[0].fd = encoder->get_fd();
53                 poll_fd[0].events = POLLIN | POLLHUP;
54
55                 while(!is_terminated) {
56                         poll_state = poll(poll_fd, 1, 1000);
57                         if (poll_state == -1) {
58                                 throw(trap("poll error."));
59                         }
60                         else if (poll_state == 0) {
61                                 continue;
62                         }
63                         if (poll_fd[0].revents & POLLIN) {
64                                 rc = wc = 0;
65                                 rc = read(encoder->get_fd(), buffer, BUFFFER_SIZE - 1);
66                                 if (rc <= 0) {
67                                         break;
68                                 }
69                                 else if (rc > 0) {
70                                         wc = write(RESPONSE_FD, buffer, rc);
71                                         //DEBUG("write : %d", wc);
72                                         if (wc < rc) {
73                                                 //DEBUG("need rewrite.. remain (%d)", rc - wc);
74                                                 int retry_wc = 0;
75                                                 for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
76                                                         poll_fd[0].revents = 0;
77
78                                                         retry_wc = write(RESPONSE_FD, (buffer + rc - remain_len), remain_len);
79                                                         wc += retry_wc;
80                                                 }
81                                                 LOG("re-write result : %d - %d", wc, rc);
82                                         }
83                                 }
84                         }
85                         else if (poll_fd[0].revents & POLLHUP)
86                         {
87                                 if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
88                                         DEBUG("stop transcoding..");
89                                         encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
90                                 }
91                                 break;
92                         }
93                 }
94         }
95         catch (const trap &e) {
96                 ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
97         }
98         do_exit(0);
99         INFO("streaming thread stop.");
100
101         if (encoder->state == Encoder::ENCODER_STAT_STARTED) {
102                 DEBUG("stop transcoding..");
103                 encoder->ioctl(Encoder::IOCTL_STOP_TRANSCODING, 0);
104         }
105
106         pthread_exit(0);
107
108         return 0;
109 }
110 //----------------------------------------------------------------------
111
112 void *source_thread_main(void *params)
113 {
114         Source *source = ((ThreadParams*) params)->source;
115         Encoder *encoder = ((ThreadParams*) params)->encoder;
116         RequestHeader *header = ((ThreadParams*) params)->request;
117
118         INFO("source thread start.");
119
120         try {
121                 int poll_state, rc, wc;
122                 struct pollfd poll_fd[2];
123                 unsigned char buffer[BUFFFER_SIZE];
124
125                 poll_fd[0].fd = encoder->get_fd();
126                 poll_fd[0].events = POLLOUT;
127
128                 poll_fd[1].fd = source->get_fd();
129                 poll_fd[1].events = POLLIN;
130
131                 while(!is_terminated) {
132                         poll_state = poll(poll_fd, 2, 1000);
133                         if (poll_state == -1) {
134                                 throw(trap("poll error."));
135                         }
136                         else if (poll_state == 0) {
137                                 continue;
138                         }
139
140                         if (poll_fd[0].revents & POLLOUT) {
141                                 rc = wc = 0;
142                                 if (poll_fd[1].revents & POLLIN) {
143                                         rc = read(source->get_fd(), buffer, BUFFFER_SIZE - 1);
144                                         if (rc == 0) {
145                                                 break;
146                                         }
147                                         else if (rc > 0) {
148                                                 wc = write(encoder->get_fd(), buffer, rc);
149                                                 //DEBUG("write : %d", wc);
150                                                 if (wc < rc) {
151                                                         //DEBUG("need rewrite.. remain (%d)", rc - wc);
152                                                         int retry_wc = 0;
153                                                         for (int remain_len = rc - wc; rc != wc; remain_len -= retry_wc) {
154                                                                 poll_fd[0].revents = 0;
155
156                                                                 poll_state = poll(poll_fd, 1, 1000);
157                                                                 if (poll_fd[0].revents & POLLOUT) {
158                                                                         retry_wc = write(encoder->get_fd(), (buffer + rc - remain_len), remain_len);
159                                                                         wc += retry_wc;
160                                                                 }
161                                                         }
162                                                         LOG("re-write result : %d - %d", wc, rc);
163                                                         usleep(500000);
164                                                 }
165                                         }
166                                 }
167                         }
168                 }
169         }
170         catch (const trap &e) {
171                 ERROR("%s %s (%d)", e.what(), strerror(errno), errno);
172         }
173         INFO("source thread stop.");
174
175         pthread_exit(0);
176
177         return 0;
178 }
179 //----------------------------------------------------------------------
180
181 int main(int argc, char **argv)
182 {
183         if (access("/tmp/.debug_on", F_OK) == 0) {
184                 Logger::instance()->init("/tmp/transtreamproxy", Logger::DEBUG, false, "3.0");
185         }
186         else {
187                 Logger::instance()->init("/tmp/transtreamproxy", Logger::WARNING, false, "3.0");
188         }
189         signal(SIGINT, signal_handler);
190
191         RequestHeader header;
192
193         int source_thread_id, stream_thread_id;
194         pthread_t source_thread_handle, stream_thread_handle;
195
196         std::string req = read_request();
197
198         DEBUG("request head :\n%s", req.c_str());
199         if (header.parse_header(req)) {
200                 Encoder encoder;
201                 Source *source = 0;
202                 ThreadParams thread_params = { 0, &encoder, &header };
203
204                 int video_pid = 0, audio_pid = 0, pmt_pid = 0;
205
206                 switch(header.type) {
207                 case REQ_TYPE_TRANSCODING_FILE:
208                         try {
209                                 MpegTS *ts = new MpegTS(header.extension["file"], true);
210                                 pmt_pid   = ts->pmt_pid;
211                                 video_pid = ts->video_pid;
212                                 audio_pid = ts->audio_pid;
213                                 source = ts;
214                         }
215                         catch (const trap &e) {
216                                 ERROR("fail to create source : %s", e.what());
217                                 exit(-1);
218                         }
219                         break;
220                 case REQ_TYPE_TRANSCODING_LIVE:
221                         try {
222                                 Demuxer *dmx = new Demuxer(&header);
223                                 pmt_pid   = dmx->pmt_pid;
224                                 video_pid = dmx->video_pid;
225                                 audio_pid = dmx->audio_pid;
226                                 source = dmx;
227                         }
228                         catch (const trap &e) {
229                                 ERROR("fail to create source : %s", e.what());
230                                 exit(-1);
231                         }
232                         break;
233                 default:
234                         ERROR("not support source type (type : %d)", header.type);
235                         exit(-1);
236                 }
237                 thread_params.source = source;
238
239                 if (!encoder.retry_open(2, 3)) {
240                         exit(-1);
241                 }
242
243                 if (encoder.state == Encoder::ENCODER_STAT_OPENED) {
244                         std::string response;
245                         off_t byte_offset = 0;
246                         if ((byte_offset = make_response((ThreadParams*) &thread_params, response)) < 0) {
247                                 do_exit(0);
248                                 return 0;
249                         }
250
251                         write(RESPONSE_FD, response.c_str(), response.length());
252                         DEBUG("response data :\n%s", response.c_str());
253
254                         if (header.type == REQ_TYPE_TRANSCODING_FILE) {
255                                 try {
256                                         std::string position = header.extension["position"];
257                                         if (position == "") {
258                                                 DEBUG("seek to byte_offset %llu", byte_offset);
259                                                 ((MpegTS*)source)->seek_absolute(byte_offset);
260                                                 DEBUG("seek ok");
261                                         }
262                                         else {
263                                                 unsigned int position_offset = strtollu(position);
264                                                 if(((MpegTS*)source)->is_time_seekable && (position_offset > 0)) {
265                                                         DEBUG("seek to position_offset %ds", position_offset);
266                                                         ((MpegTS*)source)->seek_time((position_offset * 1000) + ((MpegTS*)source)->first_pcr_ms);
267                                                         DEBUG("seek ok");
268                                                 }
269                                         }
270                                 }
271                                 catch (const trap &e) {
272                                         WARNING("Exception : %s", e.what());
273                                 }
274                         }
275
276                         if (!encoder.ioctl(Encoder::IOCTL_SET_VPID, video_pid)) {
277                                 do_exit("fail to set video pid.");
278                                 exit(-1);
279                         }
280                         if (!encoder.ioctl(Encoder::IOCTL_SET_APID, audio_pid)) {
281                                 do_exit("fail to set audio pid.");
282                                 exit(-1);
283                         }
284                         if (!encoder.ioctl(Encoder::IOCTL_SET_PMTPID, pmt_pid)) {
285                                 do_exit("fail to set pmtid.");
286                                 exit(-1);
287                         }
288                 }
289
290                 is_terminated = false;
291                 source_thread_id = pthread_create(&source_thread_handle, 0, source_thread_main, (void *)&thread_params);
292                 if (source_thread_id < 0) {
293                         do_exit("fail to create source thread.");
294                 }
295                 else {
296                         pthread_detach(source_thread_handle);
297                         sleep(1);
298                         if (!encoder.ioctl(Encoder::IOCTL_START_TRANSCODING, 0)) {
299                                 do_exit("fail to start transcoding.");
300                         }
301                         else {
302                                 stream_thread_id = pthread_create(&stream_thread_handle, 0, streaming_thread_main, (void *)&thread_params);
303                                 if (stream_thread_id < 0) {
304                                         do_exit("fail to create stream thread.");
305                                 }
306                         }
307                 }
308                 pthread_join(stream_thread_handle, 0);
309
310                 if (source != 0) {
311                         delete source;
312                         source = 0;
313                 }
314         }
315         return 0;
316 }
317 //----------------------------------------------------------------------
318
319 void do_exit(const char *message)
320 {
321         is_terminated = true;
322         if (message) {
323                 ERROR("%s", message);
324         }
325 }
326 //----------------------------------------------------------------------
327
328 void signal_handler(int sig_no)
329 {
330         INFO("signal no : %d", sig_no);
331         do_exit("signal detected..");
332 }
333 //----------------------------------------------------------------------