initial import
[vuplus_webkit] / Tools / Scripts / webkitpy / common / system / executive_unittest.py
1 # Copyright (C) 2010 Google Inc. All rights reserved.
2 # Copyright (C) 2009 Daniel Bates (dbates@intudata.com). All rights reserved.
3 #
4 # Redistribution and use in source and binary forms, with or without
5 # modification, are permitted provided that the following conditions are
6 # met:
7 #
8 #    * Redistributions of source code must retain the above copyright
9 # notice, this list of conditions and the following disclaimer.
10 #    * Redistributions in binary form must reproduce the above
11 # copyright notice, this list of conditions and the following disclaimer
12 # in the documentation and/or other materials provided with the
13 # distribution.
14 #    * Neither the name of Google Inc. nor the names of its
15 # contributors may be used to endorse or promote products derived from
16 # this software without specific prior written permission.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 import os
31 import signal
32 import subprocess
33 import sys
34 import unittest
35
36 from webkitpy.common.system.executive import Executive, ScriptError
37 from webkitpy.common.system.filesystem_mock import MockFileSystem
38 from webkitpy.test import cat, echo
39
40
41 class ScriptErrorTest(unittest.TestCase):
42     def test_string_from_args(self):
43         error = ScriptError()
44         self.assertEquals(error._string_from_args(None), 'None')
45         self.assertEquals(error._string_from_args([]), '[]')
46         self.assertEquals(error._string_from_args(map(str, range(30))), "['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17'...")
47
48     def test_message_with_output(self):
49         error = ScriptError('My custom message!', '', -1)
50         self.assertEquals(error.message_with_output(), 'My custom message!')
51         error = ScriptError('My custom message!', '', -1, 'My output.')
52         self.assertEquals(error.message_with_output(), 'My custom message!\n\nMy output.')
53         error = ScriptError('', 'my_command!', -1, 'My output.', '/Users/username/blah')
54         self.assertEquals(error.message_with_output(), 'Failed to run "my_command!" exit_code: -1 cwd: /Users/username/blah\n\nMy output.')
55         error = ScriptError('', 'my_command!', -1, 'ab' + '1' * 499)
56         self.assertEquals(error.message_with_output(), 'Failed to run "my_command!" exit_code: -1\n\nLast 500 characters of output:\nb' + '1' * 499)
57
58 def never_ending_command():
59     """Arguments for a command that will never end (useful for testing process
60     killing). It should be a process that is unlikely to already be running
61     because all instances will be killed."""
62     if sys.platform == 'win32':
63         return ['wmic']
64     return ['yes']
65
66
67 class ExecutiveTest(unittest.TestCase):
68
69     def assert_interpreter_for_content(self, intepreter, content):
70         fs = MockFileSystem()
71         file_path = None
72         file_interpreter = None
73
74         tempfile, temp_name = fs.open_binary_tempfile('')
75         tempfile.write(content)
76         tempfile.close()
77         file_interpreter = Executive.interpreter_for_script(temp_name, fs)
78
79         self.assertEqual(file_interpreter, intepreter)
80
81     def test_interpreter_for_script(self):
82         self.assert_interpreter_for_content(None, '')
83         self.assert_interpreter_for_content(None, 'abcd\nefgh\nijklm')
84         self.assert_interpreter_for_content(None, '##/usr/bin/perl')
85         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl')
86         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl\nfirst\nsecond')
87         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl')
88         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl -w')
89         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python')
90         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python\nfirst\nsecond')
91         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/python')
92         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby')
93         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby\nfirst\nsecond')
94         self.assert_interpreter_for_content('ruby', '#!/usr/bin/ruby')
95
96     def test_run_command_with_bad_command(self):
97         def run_bad_command():
98             Executive().run_command(["foo_bar_command_blah"], error_handler=Executive.ignore_error, return_exit_code=True)
99         self.failUnlessRaises(OSError, run_bad_command)
100
101     def test_run_command_args_type(self):
102         executive = Executive()
103         self.assertRaises(AssertionError, executive.run_command, "echo")
104         self.assertRaises(AssertionError, executive.run_command, u"echo")
105         executive.run_command(echo.command_arguments('foo'))
106         executive.run_command(tuple(echo.command_arguments('foo')))
107
108     def test_run_command_with_unicode(self):
109         """Validate that it is safe to pass unicode() objects
110         to Executive.run* methods, and they will return unicode()
111         objects by default unless decode_output=False"""
112         unicode_tor_input = u"WebKit \u2661 Tor Arne Vestb\u00F8!"
113         if sys.platform == 'win32':
114             encoding = 'mbcs'
115         else:
116             encoding = 'utf-8'
117         encoded_tor = unicode_tor_input.encode(encoding)
118         # On Windows, we expect the unicode->mbcs->unicode roundtrip to be
119         # lossy. On other platforms, we expect a lossless roundtrip.
120         if sys.platform == 'win32':
121             unicode_tor_output = encoded_tor.decode(encoding)
122         else:
123             unicode_tor_output = unicode_tor_input
124
125         executive = Executive()
126
127         output = executive.run_command(cat.command_arguments(), input=unicode_tor_input)
128         self.assertEquals(output, unicode_tor_output)
129
130         output = executive.run_command(echo.command_arguments("-n", unicode_tor_input))
131         self.assertEquals(output, unicode_tor_output)
132
133         output = executive.run_command(echo.command_arguments("-n", unicode_tor_input), decode_output=False)
134         self.assertEquals(output, encoded_tor)
135
136         # Make sure that str() input also works.
137         output = executive.run_command(cat.command_arguments(), input=encoded_tor, decode_output=False)
138         self.assertEquals(output, encoded_tor)
139
140         # FIXME: We should only have one run* method to test
141         output = executive.run_and_throw_if_fail(echo.command_arguments("-n", unicode_tor_input), quiet=True)
142         self.assertEquals(output, unicode_tor_output)
143
144         output = executive.run_and_throw_if_fail(echo.command_arguments("-n", unicode_tor_input), quiet=True, decode_output=False)
145         self.assertEquals(output, encoded_tor)
146
147     def test_kill_process(self):
148         executive = Executive()
149         process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE)
150         self.assertEqual(process.poll(), None)  # Process is running
151         executive.kill_process(process.pid)
152         # Note: Can't use a ternary since signal.SIGKILL is undefined for sys.platform == "win32"
153         if sys.platform == "win32":
154             # FIXME: https://bugs.webkit.org/show_bug.cgi?id=54790
155             # We seem to get either 0 or 1 here for some reason.
156             self.assertTrue(process.wait() in (0, 1))
157         else:
158             expected_exit_code = -signal.SIGKILL
159             self.assertEqual(process.wait(), expected_exit_code)
160         # Killing again should fail silently.
161         executive.kill_process(process.pid)
162
163     def _assert_windows_image_name(self, name, expected_windows_name):
164         executive = Executive()
165         windows_name = executive._windows_image_name(name)
166         self.assertEqual(windows_name, expected_windows_name)
167
168     def test_windows_image_name(self):
169         self._assert_windows_image_name("foo", "foo.exe")
170         self._assert_windows_image_name("foo.exe", "foo.exe")
171         self._assert_windows_image_name("foo.com", "foo.com")
172         # If the name looks like an extension, even if it isn't
173         # supposed to, we have no choice but to return the original name.
174         self._assert_windows_image_name("foo.baz", "foo.baz")
175         self._assert_windows_image_name("foo.baz.exe", "foo.baz.exe")
176
177     def test_kill_all(self):
178         executive = Executive()
179         # We use "yes" because it loops forever.
180         process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE)
181         self.assertEqual(process.poll(), None)  # Process is running
182         executive.kill_all(never_ending_command()[0])
183         # Note: Can't use a ternary since signal.SIGTERM is undefined for sys.platform == "win32"
184         if sys.platform == "cygwin":
185             expected_exit_code = 0  # os.kill results in exit(0) for this process.
186             self.assertEqual(process.wait(), expected_exit_code)
187         elif sys.platform == "win32":
188             # FIXME: https://bugs.webkit.org/show_bug.cgi?id=54790
189             # We seem to get either 0 or 1 here for some reason.
190             self.assertTrue(process.wait() in (0, 1))
191         else:
192             expected_exit_code = -signal.SIGTERM
193             self.assertEqual(process.wait(), expected_exit_code)
194         # Killing again should fail silently.
195         executive.kill_all(never_ending_command()[0])
196
197     def test_check_running_pid(self):
198         executive = Executive()
199         self.assertTrue(executive.check_running_pid(os.getpid()))
200         # Maximum pid number on Linux is 32768 by default
201         self.assertFalse(executive.check_running_pid(100000))
202
203     def test_running_pids(self):
204         if sys.platform in ("win32", "cygwin"):
205             return  # This function isn't implemented on Windows yet.
206
207         executive = Executive()
208         pids = executive.running_pids()
209         self.assertTrue(os.getpid() in pids)