diff options
author | oskwon <kos@dev3> | 2014-06-12 03:54:12 (GMT) |
---|---|---|
committer | oskwon <kos@dev3> | 2014-06-12 07:41:23 (GMT) |
commit | 7502eda44f0144f415808e2d372b061e40885b4e (patch) | |
tree | 82716838f47d62edfb374950c23ed8f0d90bb984 | |
parent | bcfacba238ee3e4e2f04c71293841734d0444311 (diff) |
Add session control module at encoder.
-rw-r--r-- | src/Encoder.cpp | 171 | ||||
-rw-r--r-- | src/Encoder.h | 35 | ||||
-rw-r--r-- | src/Makefile | 2 | ||||
-rw-r--r-- | src/SharedMemory.h | 80 | ||||
-rw-r--r-- | src/Utils.cpp | 63 | ||||
-rw-r--r-- | src/Utils.h | 7 | ||||
-rw-r--r-- | src/main.cpp | 4 |
7 files changed, 348 insertions, 14 deletions
diff --git a/src/Encoder.cpp b/src/Encoder.cpp index 23f66e0..37d7a83 100644 --- a/src/Encoder.cpp +++ b/src/Encoder.cpp @@ -7,6 +7,8 @@ #include <stdio.h> #include <fcntl.h> +#include <dirent.h> +#include <string.h> #include <unistd.h> #include <sys/ioctl.h> @@ -17,10 +19,59 @@ using namespace std; //---------------------------------------------------------------------- -Encoder::Encoder() +Encoder::Encoder() throw(trap) { - fd = -1; - state = ENCODER_STAT_INIT; + encoder_id = fd = -1; + max_encodr_count = state = ENCODER_STAT_INIT; + + DIR* d = opendir("/dev"); + if (d != 0) { + struct dirent* de; + while ((de = readdir(d)) != 0) { + if (strncmp("bcm_enc", de->d_name, 7) == 0) { + max_encodr_count++; + } + } + closedir(d); + } + + mSemId = 0; + mShmFd = 0; + mShmData = 0; + + mSemName = "/tsp_session_sem"; + mShmName = "/tsp_session_shm"; + mShmSize = sizeof(Session) * max_encodr_count; + + if (Open() == false) + throw(trap("session ctrl init fail.")); + DEBUG("shm-info : fd [%d], name [%s], size [%d], data [%p]", mShmFd, mShmName.c_str(), mShmSize, mShmData); + DEBUG("sem-info : id [%p], name [%s]", mSemId, mSemName.c_str()); + + std::vector<int> pidlist = find_process_by_name("transtreamproxy", 0); + + session_dump("before init."); + + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (mShmData[i].pid != 0) { + int pid = mShmData[i].pid; + if(session_terminated(pidlist, pid)) { + session_erase(pid); + } + } + } + Post(); + + int mypid = getpid(); + std::string ipaddr = get_host_addr(); + if (session_already_exist(ipaddr) > 0) { + encoder_id = session_update(ipaddr, mypid); + } + else { + encoder_id = session_register(ipaddr, mypid); + } + DEBUG("encoder_device_id : %d", encoder_id); } //---------------------------------------------------------------------- @@ -37,7 +88,7 @@ Encoder::~Encoder() } //---------------------------------------------------------------------- -bool Encoder::open(int encoder_id) +bool Encoder::encoder_open() { std::string path = "/dev/bcm_enc" + ultostr(encoder_id); fd = ::open(path.c_str(), O_RDWR, 0); @@ -49,10 +100,10 @@ bool Encoder::open(int encoder_id) } //---------------------------------------------------------------------- -bool Encoder::retry_open(int encoder_id, int retry_count, int sleep_time) +bool Encoder::retry_open(int retry_count, int sleep_time) { for (int i = 0; i < retry_count; ++i) { - if (open(encoder_id)) { + if (encoder_open()) { DEBUG("encoder-%d open success..", encoder_id); return true; } @@ -61,6 +112,7 @@ bool Encoder::retry_open(int encoder_id, int retry_count, int sleep_time) } return false; } +//---------------------------------------------------------------------- bool Encoder::ioctl(int cmd, int value) { @@ -83,3 +135,110 @@ int Encoder::get_fd() return fd; } //---------------------------------------------------------------------- + +void Encoder::session_dump(const char* aMessage) +{ + DUMMY(" >> %s", aMessage); + DUMMY("-------- [ DUMP HOST INFO ] ---------"); + for (int i = 0; i < max_encodr_count; i++) { + DUMMY("%d : ip [%s], pid [%d]", i, mShmData[i].ip, mShmData[i].pid); + } + DUMMY("-------------------------------------"); +} +//---------------------------------------------------------------------- + +bool Encoder::session_terminated(std::vector<int>& aList, int aPid) +{ + for (int i = 0; i < aList.size(); ++i) { + if (aList[i] == aPid) { + return false; + } + } + return true; +} +//---------------------------------------------------------------------- + +int Encoder::session_register(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + Wait(); + for (; i < max_encodr_count; i++) { + if (mShmData[i].pid == 0) { + result = true; + mShmData[i].pid = aPid; + strcpy(mShmData[i].ip, aIpAddr.c_str()); + break; + } + } + Post(); + session_dump("after register."); + + return result ? i : -1; +} +//---------------------------------------------------------------------- + +void Encoder::session_unregister(std::string aIpAddr) +{ + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + session_dump("after unregister."); +} +//---------------------------------------------------------------------- + +void Encoder::session_erase(int aPid) +{ + for (int i = 0; i < max_encodr_count; i++) { + if (mShmData[i].pid == aPid) { + DEBUG("erase.. %s : %d", mShmData[i].ip, mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } +} +//---------------------------------------------------------------------- + +int Encoder::session_update(std::string aIpAddr, int aPid) +{ + int i = 0; + bool result = false; + + session_dump("before update."); + Wait(); + for (; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + result = true; + kill_process(mShmData[i].pid); + memset(mShmData[i].ip, 0, 16); + mShmData[i].pid = 0; + break; + } + } + Post(); + session_register(aIpAddr, aPid); + return result ? i : -1; +} +//---------------------------------------------------------------------- + +int Encoder::session_already_exist(std::string aIpAddr) +{ + int existCount = 0; + Wait(); + for (int i = 0; i < max_encodr_count; i++) { + if (strcmp(mShmData[i].ip, aIpAddr.c_str()) == 0) { + existCount++; + } + } + Post(); + return existCount; +} +//---------------------------------------------------------------------- diff --git a/src/Encoder.h b/src/Encoder.h index 3ffea57..0760dbb 100644 --- a/src/Encoder.h +++ b/src/Encoder.h @@ -8,7 +8,20 @@ #ifndef ENCODER_H_ #define ENCODER_H_ -class Encoder +#include <string> + +#include "trap.h" + +#include "SharedMemory.h" +//---------------------------------------------------------------------- + +typedef struct _session_t { + int pid; + char ip[16]; +} Session; +//---------------------------------------------------------------------- + +class Encoder : public SharedMemory<Session> { private: int fd; @@ -30,16 +43,30 @@ public: }; int state; + int encoder_id; + int max_encodr_count; + +protected: + void session_dump(const char* aMessage); + + void session_erase(int aPid); + int session_register(std::string aIpAddr, int aPid); + void session_unregister(std::string aIpAddr); + + int session_update(std::string aIpAddr, int aPid); + bool session_terminated(std::vector<int>& aList, int aPid); + int session_already_exist(std::string aIpAddr); + protected: - bool open(int encoder_id); + bool encoder_open(); public: - Encoder(); + Encoder() throw(trap); virtual ~Encoder(); int get_fd(); bool ioctl(int cmd, int value); - bool retry_open(int encoder_id, int retry_count, int sleep_time); + bool retry_open(int retry_count, int sleep_time); }; //---------------------------------------------------------------------- diff --git a/src/Makefile b/src/Makefile index 397fde2..99389a4 100644 --- a/src/Makefile +++ b/src/Makefile @@ -41,7 +41,7 @@ OBJS=$(SRCS:.cpp=.o) .cpp.o: $(Q)echo "Compile... "$< - $(Q)$(CXX) $(CFLAGS) -c $< -o $(subst .cpp,.o,$<) + $(Q)$(CXX) -c $(CFLAGS) -o $@ $< all: .showinfo $(BIN) diff --git a/src/SharedMemory.h b/src/SharedMemory.h new file mode 100644 index 0000000..88c1d02 --- /dev/null +++ b/src/SharedMemory.h @@ -0,0 +1,80 @@ +/* + * SharedMemory.h + * + * Created on: 2014. 6. 12. + * Author: oskwon + */ + +#ifndef SHAREDMEMORY_H_ +#define SHAREDMEMORY_H_ + +#include <string> + +#include <fcntl.h> +#include <unistd.h> +#include <semaphore.h> +#include <sys/mman.h> + +using namespace std; +//------------------------------------------------------------------------------- + +template <class T> +class SharedMemory +{ +protected: + sem_t* mSemId; + std::string mSemName; + + int mShmFd; + int mShmSize; + std::string mShmName; + + T* mShmData; + +protected: + void Close() + { + if (mShmData > 0) { + munmap(mShmData, mShmSize); + } + mShmData = 0; + if (mShmFd > 0) { + close(mShmFd); + //shm_unlink(mShmName.c_str()); + } + mShmFd = 0; + if (mSemId > 0) { + sem_close(mSemId); + sem_unlink(mSemName.c_str()); + } + mSemId = 0; + } + + bool Open() + { + mShmFd = shm_open(mShmName.c_str(), O_CREAT | O_RDWR, S_IRWXU | S_IRWXG); + if (mShmFd < 0) { + return false; + } + ftruncate(mShmFd, mShmSize); + + mShmData = (T*) mmap(NULL, mShmSize, PROT_READ | PROT_WRITE, MAP_SHARED, mShmFd, 0); + if (mShmData == 0) { + return false; + } + mSemId = sem_open(mSemName.c_str(), O_CREAT, S_IRUSR | S_IWUSR, 1); + return true; + } + + void Wait() { sem_wait(mSemId); } + void Post() { sem_post(mSemId); } + +public: + ~SharedMemory() + { + Close(); + } +}; +//------------------------------------------------------------------------------- + +#endif /* UPOSIXSHAREDMEMORY_H_ */ diff --git a/src/Utils.cpp b/src/Utils.cpp index 9345e59..6da67f2 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -8,8 +8,15 @@ #include <errno.h> #include <stdarg.h> #include <string.h> +#include <dirent.h> +#include <signal.h> +#include <sys/wait.h> + +#include <arpa/inet.h> +#include <sys/socket.h> #include <sstream> +#include <fstream> #include "mpegts.h" @@ -118,7 +125,6 @@ bool RequestHeader::parse_header(std::string header) type = REQ_TYPE_TRANSCODING_FILE; decoded_path = UriDecoder().decode(value.c_str()); } - DEBUG("info (%d) -> type : [%s], path : [%s], version : [%s]", infos.size(), method.c_str(), path.c_str(), version.c_str()); for (++iter; iter != lines.end(); ++iter) { @@ -191,3 +197,58 @@ void Util::vlog(const char * format, ...) throw() WARNING("%s", vlog_buffer); } //---------------------------------------------------------------------- + +std::string get_host_addr() +{ + std::stringstream ss; + struct sockaddr_in addr; + socklen_t addrlen = sizeof(addr); + + getpeername(0, (struct sockaddr*)&addr, &addrlen); + ss << inet_ntoa(addr.sin_addr); + + return ss.str(); +} +//------------------------------------------------------------------------------- + +std::vector<int> find_process_by_name(std::string name, int mypid) +{ + std::vector<int> pidlist; + char cmdlinepath[256] = {0}; + DIR* d = opendir("/proc"); + if (d != 0) { + struct dirent* de; + while ((de = readdir(d)) != 0) { + int pid = atoi(de->d_name); + if (pid > 0) { + sprintf(cmdlinepath, "/proc/%s/cmdline", de->d_name); + + std::string cmdline; + std::ifstream cmdlinefile(cmdlinepath); + std::getline(cmdlinefile, cmdline); + if (!cmdline.empty()) { + size_t pos = cmdline.find('\0'); + if (pos != string::npos) + cmdline = cmdline.substr(0, pos); + pos = cmdline.rfind('/'); + if (pos != string::npos) + cmdline = cmdline.substr(pos + 1); + if ((name == cmdline) && ((mypid != pid) || (mypid == 0))) { + pidlist.push_back(pid); + } + } + } + } + closedir(d); + } + return pidlist; +} +//------------------------------------------------------------------------------- + +void kill_process(int pid) +{ + int result = kill(pid, SIGINT); + DEBUG("SEND SIGINT to %d, result : %d", pid, result); + sleep(1); +} +//---------------------------------------------------------------------- diff --git a/src/Utils.h b/src/Utils.h index bfeee16..3208357 100644 --- a/src/Utils.h +++ b/src/Utils.h @@ -32,7 +32,7 @@ typedef enum { REQ_TYPE_LIVE, REQ_TYPE_TRANSCODING_LIVE, REQ_TYPE_FILE, - REQ_TYPE_TRANSCODING_FILE, + REQ_TYPE_TRANSCODING_FILE } RequestType; //---------------------------------------------------------------------- @@ -76,4 +76,9 @@ public: }; //---------------------------------------------------------------------- +void kill_process(int pid); +std::string get_host_addr(); +std::vector<int> find_process_by_name(std::string name, int mypid); +//---------------------------------------------------------------------- + #endif /* UTILS_H_ */ diff --git a/src/main.cpp b/src/main.cpp index b1b8ea9..da17119 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -238,7 +238,7 @@ int main(int argc, char **argv) } thread_params.source = source; - if (!encoder.retry_open(0, 2, 3)) { + if (!encoder.retry_open(2, 3)) { exit(-1); } @@ -328,5 +328,7 @@ void signal_handler(int sig_no) { INFO("signal no : %d", sig_no); do_exit("signal detected.."); + usleep(500000); + exit(0); } //---------------------------------------------------------------------- |