1 # Copyright (C) 2009 Google Inc. All rights reserved.
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are
7 # * Redistributions of source code must retain the above copyright
8 # notice, this list of conditions and the following disclaimer.
9 # * Redistributions in binary form must reproduce the above
10 # copyright notice, this list of conditions and the following disclaimer
11 # in the documentation and/or other materials provided with the
13 # * Neither the name of Google Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34 from webkitpy.common.system import path
35 from webkitpy.common.system import ospath
38 class MockFileSystem(object):
39 def __init__(self, files=None, dirs=None, cwd='/'):
40 """Initializes a "mock" filesystem that can be used to completely
41 stub out a filesystem.
44 files: a dict of filenames -> file contents. A file contents
45 value of None is used to indicate that the file should
48 self.files = files or {}
49 self.written_files = {}
51 self.current_tmpno = 0
53 self.dirs = set(dirs or [])
57 while not d in self.dirs:
64 sep = property(_get_sep, doc="pathname separator")
66 def _raise_not_found(self, path):
67 raise IOError(errno.ENOENT, path, os.strerror(errno.ENOENT))
69 def _split(self, path):
70 return path.rsplit(self.sep, 1)
72 def abspath(self, path):
73 if os.path.isabs(path):
74 return self.normpath(path)
75 return self.abspath(self.join(self.cwd, path))
77 def basename(self, path):
78 return self._split(path)[1]
80 def expanduser(self, path):
83 parts = path.split(self.sep, 1)
84 home_directory = self.sep + "Users" + self.sep + "mock"
87 return home_directory + self.sep + parts[1]
89 def path_to_module(self, module_name):
90 return "/mock-checkout/Tools/Scripts/webkitpy/%s" % module_name
92 def chdir(self, path):
93 path = self.normpath(path)
94 if not self.isdir(path):
95 raise OSError(errno.ENOENT, path, os.strerror(errno.ENOENT))
98 def copyfile(self, source, destination):
99 if not self.exists(source):
100 self._raise_not_found(source)
101 if self.isdir(source):
102 raise IOError(errno.EISDIR, source, os.strerror(errno.ISDIR))
103 if self.isdir(destination):
104 raise IOError(errno.EISDIR, destination, os.strerror(errno.ISDIR))
106 self.files[destination] = self.files[source]
107 self.written_files[destination] = self.files[source]
109 def dirname(self, path):
110 return self._split(path)[0]
112 def exists(self, path):
113 return self.isfile(path) or self.isdir(path)
115 def files_under(self, path, dirs_to_skip=[], file_filter=None):
116 def filter_all(fs, dirpath, basename):
119 file_filter = file_filter or filter_all
121 if self.isfile(path):
122 if file_filter(self, self.dirname(path), self.basename(path)):
126 if self.basename(path) in dirs_to_skip:
129 if not path.endswith(self.sep):
132 dir_substrings = [self.sep + d + self.sep for d in dirs_to_skip]
133 for filename in self.files:
134 if not filename.startswith(path):
137 suffix = filename[len(path) - 1:]
138 if any(dir_substring in suffix for dir_substring in dir_substrings):
141 dirpath, basename = self._split(filename)
142 if file_filter(self, dirpath, basename):
143 files.append(filename)
150 def glob(self, glob_string):
151 # FIXME: This handles '*', but not '?', '[', or ']'.
152 glob_string = re.escape(glob_string)
153 glob_string = glob_string.replace('\\*', '[^\\/]*') + '$'
154 glob_string = glob_string.replace('\\/', '/')
155 path_filter = lambda path: re.match(glob_string, path)
157 # We could use fnmatch.fnmatch, but that might not do the right thing on windows.
158 existing_files = [path for path, contents in self.files.items() if contents is not None]
159 return filter(path_filter, existing_files) + filter(path_filter, self.dirs)
161 def isabs(self, path):
162 return path.startswith(self.sep)
164 def isfile(self, path):
165 return path in self.files and self.files[path] is not None
167 def isdir(self, path):
168 if path in self.files:
170 path = self.normpath(path)
171 if path in self.dirs:
174 # We need to use a copy of the keys here in order to avoid switching
175 # to a different thread and potentially modifying the dict in
177 files = self.files.keys()[:]
178 result = any(f.startswith(path) for f in files)
183 def join(self, *comps):
184 # FIXME: might want tests for this and/or a better comment about how
186 return re.sub(re.escape(os.path.sep), self.sep, os.path.join(*comps))
188 def listdir(self, path):
189 if not self.isdir(path):
190 raise OSError("%s is not a directory" % path)
192 if not path.endswith(self.sep):
198 if self.exists(f) and f.startswith(path):
199 remaining = f[len(path):]
200 if self.sep in remaining:
201 dir = remaining[:remaining.index(self.sep)]
205 files.append(remaining)
208 def mtime(self, path):
209 if self.exists(path):
211 self._raise_not_found(path)
213 def _mktemp(self, suffix='', prefix='tmp', dir=None, **kwargs):
215 dir = self.sep + '__im_tmp'
216 curno = self.current_tmpno
217 self.current_tmpno += 1
218 return self.join(dir, "%s_%u_%s" % (prefix, curno, suffix))
220 def mkdtemp(self, **kwargs):
221 class TemporaryDirectory(object):
222 def __init__(self, fs, **kwargs):
223 self._kwargs = kwargs
224 self._filesystem = fs
225 self._directory_path = fs._mktemp(**kwargs)
226 fs.maybe_make_directory(self._directory_path)
229 return self._directory_path
232 return self._directory_path
234 def __exit__(self, type, value, traceback):
235 # Only self-delete if necessary.
237 # FIXME: Should we delete non-empty directories?
238 if self._filesystem.exists(self._directory_path):
239 self._filesystem.rmtree(self._directory_path)
241 return TemporaryDirectory(fs=self, **kwargs)
243 def maybe_make_directory(self, *path):
244 norm_path = self.normpath(self.join(*path))
245 if not self.isdir(norm_path):
246 self.dirs.add(norm_path)
248 def move(self, source, destination):
249 if self.files[source] is None:
250 self._raise_not_found(source)
251 self.files[destination] = self.files[source]
252 self.written_files[destination] = self.files[destination]
253 self.files[source] = None
254 self.written_files[source] = None
256 def normpath(self, path):
257 # Like join(), relies on os.path functionality but normalizes the
258 # path separator to the mock one.
259 return re.sub(re.escape(os.path.sep), self.sep, os.path.normpath(path))
261 def open_binary_tempfile(self, suffix=''):
262 path = self._mktemp(suffix)
263 return (WritableBinaryFileObject(self, path), path)
265 def open_binary_file_for_reading(self, path):
266 if self.files[path] is None:
267 self._raise_not_found(path)
268 return ReadableBinaryFileObject(self, path, self.files[path])
270 def read_binary_file(self, path):
271 # Intentionally raises KeyError if we don't recognize the path.
272 if self.files[path] is None:
273 self._raise_not_found(path)
274 return self.files[path]
276 def write_binary_file(self, path, contents):
277 self.files[path] = contents
278 self.written_files[path] = contents
280 def open_text_file_for_reading(self, path):
281 if self.files[path] is None:
282 self._raise_not_found(path)
283 return ReadableTextFileObject(self, path)
285 def open_text_file_for_writing(self, path):
286 return WritableTextFileObject(self, path)
288 def read_text_file(self, path):
289 return self.read_binary_file(path).decode('utf-8')
291 def write_text_file(self, path, contents):
292 return self.write_binary_file(path, contents.encode('utf-8'))
294 def sha1(self, path):
295 contents = self.read_binary_file(path)
296 return hashlib.sha1(contents).hexdigest()
298 def relpath(self, path, start='.'):
299 return ospath.relpath(path, start, self.abspath, self.sep)
301 def remove(self, path):
302 if self.files[path] is None:
303 self._raise_not_found(path)
304 self.files[path] = None
305 self.written_files[path] = None
307 def rmtree(self, path):
308 path = self.normpath(path)
311 if f.startswith(path):
314 self.dirs = set(filter(lambda d: not d.startswith(path), self.dirs))
316 def split(self, path):
317 idx = path.rfind(self.sep)
320 return (path[:idx], path[(idx + 1):])
322 def splitext(self, path):
323 idx = path.rfind('.')
326 return (path[0:idx], path[idx:])
329 class WritableBinaryFileObject(object):
330 def __init__(self, fs, path):
334 self.fs.files[path] = ""
339 def __exit__(self, type, value, traceback):
345 def write(self, str):
346 self.fs.files[self.path] += str
347 self.fs.written_files[self.path] = self.fs.files[self.path]
350 class WritableTextFileObject(WritableBinaryFileObject):
351 def write(self, str):
352 WritableBinaryFileObject.write(self, str.encode('utf-8'))
355 class ReadableBinaryFileObject(object):
356 def __init__(self, fs, path, data=""):
366 def __exit__(self, type, value, traceback):
372 def read(self, bytes=None):
374 return self.data[self.offset:]
377 return self.data[start:self.offset]
380 class ReadableTextFileObject(ReadableBinaryFileObject):
381 def read(self, bytes=None):
382 return ReadableBinaryFileObject.read(self, bytes).decode('utf-8')