initial import
[vuplus_webkit] / Tools / Scripts / webkitpy / common / system / filesystem_mock.py
1 # Copyright (C) 2009 Google Inc. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
5 # met:
6 #
7 #    * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 #    * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
12 # distribution.
13 #    * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29 import errno
30 import hashlib
31 import os
32 import re
33
34 from webkitpy.common.system import path
35 from webkitpy.common.system import ospath
36
37
38 class MockFileSystem(object):
39     def __init__(self, files=None, dirs=None, cwd='/'):
40         """Initializes a "mock" filesystem that can be used to completely
41         stub out a filesystem.
42
43         Args:
44             files: a dict of filenames -> file contents. A file contents
45                 value of None is used to indicate that the file should
46                 not exist.
47         """
48         self.files = files or {}
49         self.written_files = {}
50         self._sep = '/'
51         self.current_tmpno = 0
52         self.cwd = cwd
53         self.dirs = set(dirs or [])
54         self.dirs.add(cwd)
55         for f in self.files:
56             d = self.dirname(f)
57             while not d in self.dirs:
58                 self.dirs.add(d)
59                 d = self.dirname(d)
60
61     def _get_sep(self):
62         return self._sep
63
64     sep = property(_get_sep, doc="pathname separator")
65
66     def _raise_not_found(self, path):
67         raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
68
69     def _split(self, path):
70         return path.rsplit(self.sep, 1)
71
72     def abspath(self, path):
73         if os.path.isabs(path):
74             return self.normpath(path)
75         return self.abspath(self.join(self.cwd, path))
76
77     def basename(self, path):
78         return self._split(path)[1]
79
80     def expanduser(self, path):
81         if path[0] != "~":
82             return path
83         parts = path.split(self.sep, 1)
84         home_directory = self.sep + "Users" + self.sep + "mock"
85         if len(parts) == 1:
86             return home_directory
87         return home_directory + self.sep + parts[1]
88
89     def path_to_module(self, module_name):
90         return "/mock-checkout/Tools/Scripts/webkitpy/%s" % module_name
91
92     def chdir(self, path):
93         path = self.normpath(path)
94         if not self.isdir(path):
95             raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
96         self.cwd = path
97
98     def copyfile(self, source, destination):
99         if not self.exists(source):
100             self._raise_not_found(source)
101         if self.isdir(source):
102             raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
103         if self.isdir(destination):
104             raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))
105
106         self.files[destination] = self.files[source]
107         self.written_files[destination] = self.files[source]
108
109     def dirname(self, path):
110         return self._split(path)[0]
111
112     def exists(self, path):
113         return self.isfile(path) or self.isdir(path)
114
115     def files_under(self, path, dirs_to_skip=[], file_filter=None):
116         def filter_all(fs, dirpath, basename):
117             return True
118
119         file_filter = file_filter or filter_all
120         files = []
121         if self.isfile(path):
122             if file_filter(self, self.dirname(path), self.basename(path)):
123                 files.append(path)
124             return files
125
126         if self.basename(path) in dirs_to_skip:
127             return []
128
129         if not path.endswith(self.sep):
130             path += self.sep
131
132         dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
133         for filename in self.files:
134             if not filename.startswith(path):
135                 continue
136
137             suffix = filename[len(path) - 1:]
138             if any(dir_substring in suffix for dir_substring in dir_substrings):
139                 continue
140
141             dirpath, basename = self._split(filename)
142             if file_filter(self, dirpath, basename):
143                 files.append(filename)
144
145         return files
146
147     def getcwd(self):
148         return self.cwd
149
150     def glob(self, glob_string):
151         # FIXME: This handles '*', but not '?', '[', or ']'.
152         glob_string = re.escape(glob_string)
153         glob_string = glob_string.replace('\\*', '[^\\/]*') + '$'
154         glob_string = glob_string.replace('\\/', '/')
155         path_filter = lambda path: re.match(glob_string, path)
156
157         # We could use fnmatch.fnmatch, but that might not do the right thing on windows.
158         existing_files = [path for path, contents in self.files.items() if contents is not None]
159         return filter(path_filter, existing_files) + filter(path_filter, self.dirs)
160
161     def isabs(self, path):
162         return path.startswith(self.sep)
163
164     def isfile(self, path):
165         return path in self.files and self.files[path] is not None
166
167     def isdir(self, path):
168         if path in self.files:
169             return False
170         path = self.normpath(path)
171         if path in self.dirs:
172             return True
173
174         # We need to use a copy of the keys here in order to avoid switching
175         # to a different thread and potentially modifying the dict in
176         # mid-iteration.
177         files = self.files.keys()[:]
178         result = any(f.startswith(path) for f in files)
179         if result:
180             self.dirs.add(path)
181         return result
182
183     def join(self, *comps):
184         # FIXME: might want tests for this and/or a better comment about how
185         # it works.
186         return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
187
188     def listdir(self, path):
189         if not self.isdir(path):
190             raise OSError("%s is not a directory" % path)
191
192         if not path.endswith(self.sep):
193             path += self.sep
194
195         dirs = []
196         files = []
197         for f in self.files:
198             if self.exists(f) and f.startswith(path):
199                 remaining = f[len(path):]
200                 if self.sep in remaining:
201                     dir = remaining[:remaining.index(self.sep)]
202                     if not dir in dirs:
203                         dirs.append(dir)
204                 else:
205                     files.append(remaining)
206         return dirs + files
207
208     def mtime(self, path):
209         if self.exists(path):
210             return 0
211         self._raise_not_found(path)
212
213     def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
214         if dir is None:
215             dir = self.sep + '__im_tmp'
216         curno = self.current_tmpno
217         self.current_tmpno += 1
218         return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))
219
220     def mkdtemp(self, **kwargs):
221         class TemporaryDirectory(object):
222             def __init__(self, fs, **kwargs):
223                 self._kwargs = kwargs
224                 self._filesystem = fs
225                 self._directory_path = fs._mktemp(**kwargs)
226                 fs.maybe_make_directory(self._directory_path)
227
228             def __str__(self):
229                 return self._directory_path
230
231             def __enter__(self):
232                 return self._directory_path
233
234             def __exit__(self, type, value, traceback):
235                 # Only self-delete if necessary.
236
237                 # FIXME: Should we delete non-empty directories?
238                 if self._filesystem.exists(self._directory_path):
239                     self._filesystem.rmtree(self._directory_path)
240
241         return TemporaryDirectory(fs=self, **kwargs)
242
243     def maybe_make_directory(self, *path):
244         norm_path = self.normpath(self.join(*path))
245         if not self.isdir(norm_path):
246             self.dirs.add(norm_path)
247
248     def move(self, source, destination):
249         if self.files[source] is None:
250             self._raise_not_found(source)
251         self.files[destination] = self.files[source]
252         self.written_files[destination] = self.files[destination]
253         self.files[source] = None
254         self.written_files[source] = None
255
256     def normpath(self, path):
257         # Like join(), relies on os.path functionality but normalizes the
258         # path separator to the mock one.
259         return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
260
261     def open_binary_tempfile(self, suffix=''):
262         path = self._mktemp(suffix)
263         return (WritableBinaryFileObject(self, path), path)
264
265     def open_binary_file_for_reading(self, path):
266         if self.files[path] is None:
267             self._raise_not_found(path)
268         return ReadableBinaryFileObject(self, path, self.files[path])
269
270     def read_binary_file(self, path):
271         # Intentionally raises KeyError if we don't recognize the path.
272         if self.files[path] is None:
273             self._raise_not_found(path)
274         return self.files[path]
275
276     def write_binary_file(self, path, contents):
277         self.files[path] = contents
278         self.written_files[path] = contents
279
280     def open_text_file_for_reading(self, path):
281         if self.files[path] is None:
282             self._raise_not_found(path)
283         return ReadableTextFileObject(self, path)
284
285     def open_text_file_for_writing(self, path):
286         return WritableTextFileObject(self, path)
287
288     def read_text_file(self, path):
289         return self.read_binary_file(path).decode('utf-8')
290
291     def write_text_file(self, path, contents):
292         return self.write_binary_file(path, contents.encode('utf-8'))
293
294     def sha1(self, path):
295         contents = self.read_binary_file(path)
296         return hashlib.sha1(contents).hexdigest()
297
298     def relpath(self, path, start='.'):
299         return ospath.relpath(path, start, self.abspath, self.sep)
300
301     def remove(self, path):
302         if self.files[path] is None:
303             self._raise_not_found(path)
304         self.files[path] = None
305         self.written_files[path] = None
306
307     def rmtree(self, path):
308         path = self.normpath(path)
309
310         for f in self.files:
311             if f.startswith(path):
312                 self.files[f] = None
313
314         self.dirs = set(filter(lambda d: not d.startswith(path), self.dirs))
315
316     def split(self, path):
317         idx = path.rfind(self.sep)
318         if idx == -1:
319             return ('', path)
320         return (path[:idx], path[(idx + 1):])
321
322     def splitext(self, path):
323         idx = path.rfind('.')
324         if idx == -1:
325             idx = 0
326         return (path[0:idx], path[idx:])
327
328
329 class WritableBinaryFileObject(object):
330     def __init__(self, fs, path):
331         self.fs = fs
332         self.path = path
333         self.closed = False
334         self.fs.files[path] = ""
335
336     def __enter__(self):
337         return self
338
339     def __exit__(self, type, value, traceback):
340         self.close()
341
342     def close(self):
343         self.closed = True
344
345     def write(self, str):
346         self.fs.files[self.path] += str
347         self.fs.written_files[self.path] = self.fs.files[self.path]
348
349
350 class WritableTextFileObject(WritableBinaryFileObject):
351     def write(self, str):
352         WritableBinaryFileObject.write(self, str.encode('utf-8'))
353
354
355 class ReadableBinaryFileObject(object):
356     def __init__(self, fs, path, data=""):
357         self.fs = fs
358         self.path = path
359         self.closed = False
360         self.data = data
361         self.offset = 0
362
363     def __enter__(self):
364         return self
365
366     def __exit__(self, type, value, traceback):
367         self.close()
368
369     def close(self):
370         self.closed = True
371
372     def read(self, bytes=None):
373         if not bytes:
374             return self.data[self.offset:]
375         start = self.offset
376         self.offset += bytes
377         return self.data[start:self.offset]
378
379
380 class ReadableTextFileObject(ReadableBinaryFileObject):
381     def read(self, bytes=None):
382         return ReadableBinaryFileObject.read(self, bytes).decode('utf-8')