aba05421d1bc7043015739e185a62b733f8e8756
[vuplus_xbmc] / xbmc / filesystem / PipesManager.cpp
1 /*
2  *      Copyright (C) 2011-2013 Team XBMC
3  *      http://xbmc.org
4  *
5  *  This Program is free software; you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation; either version 2, or (at your option)
8  *  any later version.
9  *
10  *  This Program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with XBMC; see the file COPYING.  If not, see
17  *  <http://www.gnu.org/licenses/>.
18  *
19  */
20
21 #include "PipesManager.h"
22 #include "threads/SingleLock.h"
23
24 #ifndef min
25 #define min(a,b) ((a) < (b) ? (a) : (b))
26 #endif
27
28 using namespace XFILE;
29
30
31 Pipe::Pipe(const CStdString &name, int nMaxSize)
32 {
33   m_buffer.Create(nMaxSize);
34   m_nRefCount = 1;
35   m_readEvent.Reset();
36   m_writeEvent.Set();
37   m_strPipeName = name;
38   m_bOpen = true;
39   m_bEof = false;
40   m_nOpenThreashold = PIPE_DEFAULT_MAX_SIZE / 2;
41   m_bReadyForRead = true; // open threashold disabled atm
42 }
43
44 Pipe::~Pipe()
45 {
46 }
47
48 void Pipe::SetOpenThreashold(int threashold)
49 {
50   m_nOpenThreashold = threashold;
51 }
52
53 const CStdString &Pipe::GetName() 
54 {
55   return m_strPipeName;
56 }
57
58 void Pipe::AddRef()
59 {
60   CSingleLock lock(m_lock);
61   m_nRefCount++;
62 }
63
64 void Pipe::DecRef()
65 {
66   CSingleLock lock(m_lock);
67   m_nRefCount--;
68 }
69
70 int  Pipe::RefCount()
71 {
72   CSingleLock lock(m_lock);
73   return m_nRefCount;
74 }
75
76 void Pipe::SetEof()
77 {
78   m_bEof = true;
79 }
80
81 bool Pipe::IsEof()
82 {
83   return m_bEof;
84 }
85
86 bool Pipe::IsEmpty()
87 {
88   return (m_buffer.getMaxReadSize() == 0);
89 }
90
91 void Pipe::Flush()
92 {
93   CSingleLock lock(m_lock);
94
95   if (!m_bOpen || !m_bReadyForRead || m_bEof)
96   {
97     return;
98   }
99   m_buffer.Clear();
100   CheckStatus();
101 }
102
103 int  Pipe::Read(char *buf, int nMaxSize, int nWaitMillis)
104 {
105   CSingleLock lock(m_lock);
106   
107   if (!m_bOpen)
108   {
109     return -1;
110   }
111
112   while (!m_bReadyForRead && !m_bEof)
113     m_readEvent.WaitMSec(100);
114
115   int nResult = 0;
116   if (!IsEmpty())
117   {
118     int nToRead = min((int)m_buffer.getMaxReadSize(), nMaxSize);
119     m_buffer.ReadData(buf, nToRead);
120     nResult = nToRead;
121   }
122   else if (m_bEof)
123   {
124     nResult = 0;
125   }
126   else
127   {
128     // we're leaving the guard - add ref to make sure we are not getting erased.
129     // at the moment we leave m_listeners unprotected which might be a problem in future
130     // but as long as we only have 1 listener attaching at startup and detaching on close we're fine
131     AddRef();
132     lock.Leave();
133
134     bool bHasData = false;
135     int nMillisLeft = nWaitMillis;
136     if (nMillisLeft < 0)
137       nMillisLeft = 5*60*1000; // arbitrary. 5 min.
138
139     do
140     {
141       for (size_t l=0; l<m_listeners.size(); l++)
142         m_listeners[l]->OnPipeUnderFlow();
143
144       bHasData = m_readEvent.WaitMSec(min(200,nMillisLeft));
145       nMillisLeft -= 200;
146     } while (!bHasData && nMillisLeft > 0 && !m_bEof);
147
148     lock.Enter();
149     DecRef();
150     
151     if (!m_bOpen)
152       return -1;
153     
154     if (bHasData)
155     {
156       int nToRead = min((int)m_buffer.getMaxReadSize(), nMaxSize);
157       m_buffer.ReadData(buf, nToRead);
158       nResult = nToRead;
159     }
160   }
161   
162   CheckStatus();
163   
164   return nResult;
165 }
166
167 bool Pipe::Write(const char *buf, int nSize, int nWaitMillis)
168 {
169   CSingleLock lock(m_lock);
170   if (!m_bOpen)
171     return false;
172   bool bOk = false;
173   int writeSize = m_buffer.getMaxWriteSize();
174   if (writeSize > nSize)
175   {
176     m_buffer.WriteData((char*)buf, nSize);
177     bOk = true;
178   }
179   else
180   {
181     while ( (int)m_buffer.getMaxWriteSize() < nSize && m_bOpen )
182     {
183       lock.Leave();
184       for (size_t l=0; l<m_listeners.size(); l++)
185         m_listeners[l]->OnPipeOverFlow();
186
187       bool bClear = nWaitMillis < 0 ? m_writeEvent.Wait() : m_writeEvent.WaitMSec(nWaitMillis);
188       lock.Enter();
189       if (bClear && (int)m_buffer.getMaxWriteSize() >= nSize)
190       {
191         m_buffer.WriteData((char*)buf, nSize);
192         bOk = true;
193         break;
194       }
195
196       // FIXME: is this right? Shouldn't we see if the time limit has been reached?
197       if (nWaitMillis > 0)
198         break;
199     }
200   }
201
202   CheckStatus();
203   
204   return bOk && m_bOpen;
205 }
206
207 void Pipe::CheckStatus()
208 {
209   if (m_bEof)
210   {
211     m_writeEvent.Set();
212     m_readEvent.Set();  
213     return;
214   }
215   
216   if (m_buffer.getMaxWriteSize() == 0)
217     m_writeEvent.Reset();
218   else
219     m_writeEvent.Set();
220   
221   if (m_buffer.getMaxReadSize() == 0)
222     m_readEvent.Reset();
223   else
224   {
225     if (!m_bReadyForRead  && (int)m_buffer.getMaxReadSize() >= m_nOpenThreashold)
226       m_bReadyForRead = true;
227     m_readEvent.Set();  
228   }
229 }
230
231 void Pipe::Close()
232 {
233   CSingleLock lock(m_lock);
234   m_bOpen = false;
235   m_readEvent.Set();
236   m_writeEvent.Set();
237 }
238
239 void Pipe::AddListener(IPipeListener *l)
240 {
241   CSingleLock lock(m_lock);
242   for (size_t i=0; i<m_listeners.size(); i++)
243   {
244     if (m_listeners[i] == l)
245       return;
246   }
247   m_listeners.push_back(l);
248 }
249
250 void Pipe::RemoveListener(IPipeListener *l)
251 {
252   CSingleLock lock(m_lock);
253   std::vector<XFILE::IPipeListener *>::iterator i = m_listeners.begin();
254   while(i != m_listeners.end())
255   {
256     if ( (*i) == l)
257       i = m_listeners.erase(i);
258     else
259       i++;
260   }
261 }
262
263 int     Pipe::GetAvailableRead()
264 {
265   CSingleLock lock(m_lock);
266   return m_buffer.getMaxReadSize();
267 }
268
269 PipesManager::PipesManager() : m_nGenIdHelper(1)
270 {
271 }
272
273 PipesManager::~PipesManager()
274 {
275 }
276
277 PipesManager &PipesManager::GetInstance()
278 {
279   static PipesManager instance;
280   return instance;
281 }
282
283 CStdString   PipesManager::GetUniquePipeName()
284 {
285   CSingleLock lock(m_lock);
286   CStdString id;
287   id.Format("pipe://%d/",m_nGenIdHelper++);
288   return id;
289 }
290
291 XFILE::Pipe *PipesManager::CreatePipe(const CStdString &name, int nMaxPipeSize)
292 {
293   CStdString pName = name;
294   if (pName.IsEmpty())
295     pName = GetUniquePipeName();
296   
297   CSingleLock lock(m_lock);
298   if (m_pipes.find(pName) != m_pipes.end())
299     return NULL;
300   
301   XFILE::Pipe *p = new XFILE::Pipe(pName, nMaxPipeSize);
302   m_pipes[pName] = p;
303   return p;
304 }
305
306 XFILE::Pipe *PipesManager::OpenPipe(const CStdString &name)
307 {
308   CSingleLock lock(m_lock);
309   if (m_pipes.find(name) == m_pipes.end())
310     return NULL;
311   m_pipes[name]->AddRef();
312   return m_pipes[name];
313 }
314
315 void         PipesManager::ClosePipe(XFILE::Pipe *pipe)
316 {
317   CSingleLock lock(m_lock);
318   if (!pipe)
319     return ;
320   
321   pipe->DecRef();
322   pipe->Close();
323   if (pipe->RefCount() == 0)
324   {
325     m_pipes.erase(pipe->GetName());
326     delete pipe;
327   }
328 }
329
330 bool         PipesManager::Exists(const CStdString &name)
331 {
332   CSingleLock lock(m_lock);
333   return (m_pipes.find(name) != m_pipes.end());
334 }
335