summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoroskwon <kos@dev3>2014-06-12 03:54:12 (GMT)
committeroskwon <kos@dev3>2014-06-12 07:41:23 (GMT)
commit7502eda44f0144f415808e2d372b061e40885b4e (patch)
tree82716838f47d62edfb374950c23ed8f0d90bb984
parentbcfacba238ee3e4e2f04c71293841734d0444311 (diff)
Add session control module at encoder.
-rw-r--r--src/Encoder.cpp171
-rw-r--r--src/Encoder.h35
-rw-r--r--src/Makefile2
-rw-r--r--src/SharedMemory.h80
-rw-r--r--src/Utils.cpp63
-rw-r--r--src/Utils.h7
-rw-r--r--src/main.cpp4
7 files changed, 348 insertions, 14 deletions
diff --git a/src/Encoder.cpp b/src/Encoder.cpp
index 23f66e0..37d7a83 100644
--- a/src/Encoder.cpp
+++ b/src/Encoder.cpp
@@ -7,6 +7,8 @@
#include <stdio.h>
#include <fcntl.h>
+#include <dirent.h>
+#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
@@ -17,10 +19,59 @@
using namespace std;
//----------------------------------------------------------------------
-Encoder::Encoder()
+Encoder::Encoder() throw(trap)
{
- fd = -1;
- state = ENCODER_STAT_INIT;
+ encoder_id = fd = -1;
+ max_encodr_count = state = ENCODER_STAT_INIT;
+
+ DIR* d = opendir("/dev");
+ if (d != 0) {
+ struct dirent* de;
+ while ((de = readdir(d)) != 0) {
+ if (strncmp("bcm_enc", de->d_name, 7) == 0) {
+ max_encodr_count++;
+ }
+ }
+ closedir(d);
+ }
+
+ mSemId = 0;
+ mShmFd = 0;
+ mShmData = 0;
+
+ mSemName = "/tsp_session_sem";
+ mShmName = "/tsp_session_shm";
+ mShmSize = sizeof(Session) * max_encodr_count;
+
+ if (Open() == false)
+ throw(trap("session ctrl init fail."));
+ DEBUG("shm-info : fd [%d], name [%s], size [%d], data [%p]", mShmFd, mShmName.c_str(), mShmSize, mShmData);
+ DEBUG("sem-info : id [%p], name [%s]", mSemId, mSemName.c_str());
+
+ std::vector<int> pidlist = find_process_by_name("transtreamproxy", 0);
+
+ session_dump("before init.");
+
+ Wait();
+ for (int i = 0; i < max_encodr_count; i++) {
+ if (mShmData[i].pid != 0) {
+ int pid = mShmData[i].pid;
+ if(session_terminated(pidlist, pid)) {
+ session_erase(pid);
+ }
+ }
+ }
+ Post();
+
+ int mypid = getpid();
+ std::string ipaddr = get_host_addr();
+ if (session_already_exist(ipaddr) > 0) {
+ encoder_id = session_update(ipaddr, mypid);
+ }
+ else {
+ encoder_id = session_register(ipaddr, mypid);
+ }
+ DEBUG("encoder_device_id : %d", encoder_id);
}
//----------------------------------------------------------------------
@@ -37,7 +88,7 @@ Encoder::~Encoder()
}
//----------------------------------------------------------------------
-bool Encoder::open(int encoder_id)
+bool Encoder::encoder_open()
{
std::string path = "/dev/bcm_enc" + ultostr(encoder_id);
fd = ::open(path.c_str(), O_RDWR, 0);
@@ -49,10 +100,10 @@ bool Encoder::open(int encoder_id)
}
//----------------------------------------------------------------------
-bool Encoder::retry_open(int encoder_id, int retry_count, int sleep_time)
+bool Encoder::retry_open(int retry_count, int sleep_time)
{
for (int i = 0; i < retry_count; ++i) {
- if (open(encoder_id)) {
+ if (encoder_open()) {
DEBUG("encoder-%d open success..", encoder_id);
return true;
}
@@ -61,6 +112,7 @@ bool Encoder::retry_open(int encoder_id, int retry_count, int sleep_time)
}
return false;
}
+//----------------------------------------------------------------------
bool Encoder::ioctl(int cmd, int value)
{
@@ -83,3 +135,110 @@ int Encoder::get_fd()
return fd;
}
//----------------------------------------------------------------------
+
+void Encoder::session_dump(const char* aMessage)
+{
+ DUMMY(" >> %s", aMessage);
+ DUMMY("-------- [ DUMP HOST INFO ] ---------");
+ for (int i = 0; i < max_encodr_count; i++) {
+ DUMMY("%d : ip [%s], pid [%d]", i, mShmData[i].ip, mShmData[i].pid);
+ }
+ DUMMY("-------------------------------------");
+}
+//----------------------------------------------------------------------
+
+bool Encoder::session_terminated(std::vector<int>& aList, int aPid)
+{
+ for (int i = 0; i < aList.size(); ++i) {
+ if (aList[i] == aPid) {
+ return false;
+ }
+ }
+ return true;
+}
+//----------------------------------------------------------------------
+
+int Encoder::session_register(std::string aIpAddr, int aPid)
+{
+ int i = 0;
+ bool result = false;
+
+ Wait();
+ for (; i < max_encodr_count; i++) {
+ if (mShmData[i].pid == 0) {
+ result = true;
+ mShmData[i].pid = aPid;
+ strcpy(mShmData[i].ip, aIpAddr.c_str());
+ break;
+ }
+ }
+ Post();
+ session_dump("after register.");
+
+ return result ? i : -1;
+}
+//----------------------------------------------------------------------
+
+void Encoder::session_unregister(std::string aIpAddr)
+{
+ Wait();
+ for (int i = 0; i < max_encodr_count; i++) {
+ if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) {
+ memset(mShmData[i].ip, 0, 16);
+ mShmData[i].pid = 0;
+ break;
+ }
+ }
+ Post();
+ session_dump("after unregister.");
+}
+//----------------------------------------------------------------------
+
+void Encoder::session_erase(int aPid)
+{
+ for (int i = 0; i < max_encodr_count; i++) {
+ if (mShmData[i].pid == aPid) {
+ DEBUG("erase.. %s : %d", mShmData[i].ip, mShmData[i].pid);
+ memset(mShmData[i].ip, 0, 16);
+ mShmData[i].pid = 0;
+ break;
+ }
+ }
+}
+//----------------------------------------------------------------------
+
+int Encoder::session_update(std::string aIpAddr, int aPid)
+{
+ int i = 0;
+ bool result = false;
+
+ session_dump("before update.");
+ Wait();
+ for (; i < max_encodr_count; i++) {
+ if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) {
+ result = true;
+ kill_process(mShmData[i].pid);
+ memset(mShmData[i].ip, 0, 16);
+ mShmData[i].pid = 0;
+ break;
+ }
+ }
+ Post();
+ session_register(aIpAddr, aPid);
+ return result ? i : -1;
+}
+//----------------------------------------------------------------------
+
+int Encoder::session_already_exist(std::string aIpAddr)
+{
+ int existCount = 0;
+ Wait();
+ for (int i = 0; i < max_encodr_count; i++) {
+ if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) {
+ existCount++;
+ }
+ }
+ Post();
+ return existCount;
+}
+//----------------------------------------------------------------------
diff --git a/src/Encoder.h b/src/Encoder.h
index 3ffea57..0760dbb 100644
--- a/src/Encoder.h
+++ b/src/Encoder.h
@@ -8,7 +8,20 @@
#ifndef ENCODER_H_
#define ENCODER_H_
-class Encoder
+#include <string>
+
+#include "trap.h"
+
+#include "SharedMemory.h"
+//----------------------------------------------------------------------
+
+typedef struct _session_t {
+ int pid;
+ char ip[16];
+} Session;
+//----------------------------------------------------------------------
+
+class Encoder : public SharedMemory<Session>
{
private:
int fd;
@@ -30,16 +43,30 @@ public:
};
int state;
+ int encoder_id;
+ int max_encodr_count;
+
+protected:
+ void session_dump(const char* aMessage);
+
+ void session_erase(int aPid);
+ int session_register(std::string aIpAddr, int aPid);
+ void session_unregister(std::string aIpAddr);
+
+ int session_update(std::string aIpAddr, int aPid);
+ bool session_terminated(std::vector<int>& aList, int aPid);
+ int session_already_exist(std::string aIpAddr);
+
protected:
- bool open(int encoder_id);
+ bool encoder_open();
public:
- Encoder();
+ Encoder() throw(trap);
virtual ~Encoder();
int get_fd();
bool ioctl(int cmd, int value);
- bool retry_open(int encoder_id, int retry_count, int sleep_time);
+ bool retry_open(int retry_count, int sleep_time);
};
//----------------------------------------------------------------------
diff --git a/src/Makefile b/src/Makefile
index 397fde2..99389a4 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -41,7 +41,7 @@ OBJS=$(SRCS:.cpp=.o)
.cpp.o:
$(Q)echo "Compile... "$<
- $(Q)$(CXX) $(CFLAGS) -c $< -o $(subst .cpp,.o,$<)
+ $(Q)$(CXX) -c $(CFLAGS) -o $@ $<
all: .showinfo $(BIN)
diff --git a/src/SharedMemory.h b/src/SharedMemory.h
new file mode 100644
index 0000000..88c1d02
--- /dev/null
+++ b/src/SharedMemory.h
@@ -0,0 +1,80 @@
+/*
+ * SharedMemory.h
+ *
+ * Created on: 2014. 6. 12.
+ * Author: oskwon
+ */
+
+#ifndef SHAREDMEMORY_H_
+#define SHAREDMEMORY_H_
+
+#include <string>
+
+#include <fcntl.h>
+#include <unistd.h>
+#include <semaphore.h>
+#include <sys/mman.h>
+
+using namespace std;
+//-------------------------------------------------------------------------------
+
+template <class T>
+class SharedMemory
+{
+protected:
+ sem_t* mSemId;
+ std::string mSemName;
+
+ int mShmFd;
+ int mShmSize;
+ std::string mShmName;
+
+ T* mShmData;
+
+protected:
+ void Close()
+ {
+ if (mShmData > 0) {
+ munmap(mShmData, mShmSize);
+ }
+ mShmData = 0;
+ if (mShmFd > 0) {
+ close(mShmFd);
+ //shm_unlink(mShmName.c_str());
+ }
+ mShmFd = 0;
+ if (mSemId > 0) {
+ sem_close(mSemId);
+ sem_unlink(mSemName.c_str());
+ }
+ mSemId = 0;
+ }
+
+ bool Open()
+ {
+ mShmFd = shm_open(mShmName.c_str(), O_CREAT | O_RDWR, S_IRWXU | S_IRWXG);
+ if (mShmFd < 0) {
+ return false;
+ }
+ ftruncate(mShmFd, mShmSize);
+
+ mShmData = (T*) mmap(NULL, mShmSize, PROT_READ | PROT_WRITE, MAP_SHARED, mShmFd, 0);
+ if (mShmData == 0) {
+ return false;
+ }
+ mSemId = sem_open(mSemName.c_str(), O_CREAT, S_IRUSR | S_IWUSR, 1);
+ return true;
+ }
+
+ void Wait() { sem_wait(mSemId); }
+ void Post() { sem_post(mSemId); }
+
+public:
+ ~SharedMemory()
+ {
+ Close();
+ }
+};
+//-------------------------------------------------------------------------------
+
+#endif /* UPOSIXSHAREDMEMORY_H_ */
diff --git a/src/Utils.cpp b/src/Utils.cpp
index 9345e59..6da67f2 100644
--- a/src/Utils.cpp
+++ b/src/Utils.cpp
@@ -8,8 +8,15 @@
#include <errno.h>
#include <stdarg.h>
#include <string.h>
+#include <dirent.h>
+#include <signal.h>
+#include <sys/wait.h>
+
+#include <arpa/inet.h>
+#include <sys/socket.h>
#include <sstream>
+#include <fstream>
#include "mpegts.h"
@@ -118,7 +125,6 @@ bool RequestHeader::parse_header(std::string header)
type = REQ_TYPE_TRANSCODING_FILE;
decoded_path = UriDecoder().decode(value.c_str());
}
-
DEBUG("info (%d) -> type : [%s], path : [%s], version : [%s]", infos.size(), method.c_str(), path.c_str(), version.c_str());
for (++iter; iter != lines.end(); ++iter) {
@@ -191,3 +197,58 @@ void Util::vlog(const char * format, ...) throw()
WARNING("%s", vlog_buffer);
}
//----------------------------------------------------------------------
+
+std::string get_host_addr()
+{
+ std::stringstream ss;
+ struct sockaddr_in addr;
+ socklen_t addrlen = sizeof(addr);
+
+ getpeername(0, (struct sockaddr*)&addr, &addrlen);
+ ss << inet_ntoa(addr.sin_addr);
+
+ return ss.str();
+}
+//-------------------------------------------------------------------------------
+
+std::vector<int> find_process_by_name(std::string name, int mypid)
+{
+ std::vector<int> pidlist;
+ char cmdlinepath[256] = {0};
+ DIR* d = opendir("/proc");
+ if (d != 0) {
+ struct dirent* de;
+ while ((de = readdir(d)) != 0) {
+ int pid = atoi(de->d_name);
+ if (pid > 0) {
+ sprintf(cmdlinepath, "/proc/%s/cmdline", de->d_name);
+
+ std::string cmdline;
+ std::ifstream cmdlinefile(cmdlinepath);
+ std::getline(cmdlinefile, cmdline);
+ if (!cmdline.empty()) {
+ size_t pos = cmdline.find('\0');
+ if (pos != string::npos)
+ cmdline = cmdline.substr(0, pos);
+ pos = cmdline.rfind('/');
+ if (pos != string::npos)
+ cmdline = cmdline.substr(pos + 1);
+ if ((name == cmdline) && ((mypid != pid) || (mypid == 0))) {
+ pidlist.push_back(pid);
+ }
+ }
+ }
+ }
+ closedir(d);
+ }
+ return pidlist;
+}
+//-------------------------------------------------------------------------------
+
+void kill_process(int pid)
+{
+ int result = kill(pid, SIGINT);
+ DEBUG("SEND SIGINT to %d, result : %d", pid, result);
+ sleep(1);
+}
+//----------------------------------------------------------------------
diff --git a/src/Utils.h b/src/Utils.h
index bfeee16..3208357 100644
--- a/src/Utils.h
+++ b/src/Utils.h
@@ -32,7 +32,7 @@ typedef enum {
REQ_TYPE_LIVE,
REQ_TYPE_TRANSCODING_LIVE,
REQ_TYPE_FILE,
- REQ_TYPE_TRANSCODING_FILE,
+ REQ_TYPE_TRANSCODING_FILE
} RequestType;
//----------------------------------------------------------------------
@@ -76,4 +76,9 @@ public:
};
//----------------------------------------------------------------------
+void kill_process(int pid);
+std::string get_host_addr();
+std::vector<int> find_process_by_name(std::string name, int mypid);
+//----------------------------------------------------------------------
+
#endif /* UTILS_H_ */
diff --git a/src/main.cpp b/src/main.cpp
index b1b8ea9..da17119 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -238,7 +238,7 @@ int main(int argc, char **argv)
}
thread_params.source = source;
- if (!encoder.retry_open(0, 2, 3)) {
+ if (!encoder.retry_open(2, 3)) {
exit(-1);
}
@@ -328,5 +328,7 @@ void signal_handler(int sig_no)
{
INFO("signal no : %d", sig_no);
do_exit("signal detected..");
+ usleep(500000);
+ exit(0);
}
//----------------------------------------------------------------------