Source code

Revision control

Copy as Markdown

Other Tools

try:
import unittest2 as unittest # python < 2.7 compat
except ImportError:
import unittest
import tempfile
import shutil
import os
import time
import six
from mock import Mock
from dlmanager import manager as download_manager
def mock_session():
response = Mock()
session = Mock(get=Mock(return_value=response))
return session, response
def mock_response(response, data, wait=0):
data = six.b(data)
def iter_content(chunk_size=4):
rest = data
while rest:
time.sleep(wait)
chunk = rest[:chunk_size]
rest = rest[chunk_size:]
yield chunk
response.headers = {'Content-length': str(len(data))}
response.iter_content = iter_content
class TestDownload(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
self.finished = Mock()
self.session, self.session_response = mock_session()
self.tempfile = os.path.join(self.tempdir, 'dest')
self.dl = download_manager.Download('http://url', self.tempfile,
finished_callback=self.finished,
chunk_size=4,
session=self.session)
def test_creation(self):
self.assertFalse(self.dl.is_canceled())
self.assertFalse(self.dl.is_running())
self.assertIsNone(self.dl.error())
self.assertEquals(self.dl.get_url(), 'http://url')
self.assertEquals(self.dl.get_dest(), self.tempfile)
def create_response(self, data, wait=0):
mock_response(self.session_response, data, wait)
def test_download(self):
self.create_response('1234' * 4, 0.01)
# no file present yet
self.assertFalse(os.path.exists(self.tempfile))
self.dl.start()
self.assertTrue(self.dl.is_running())
self.dl.wait()
self.assertFalse(self.dl.is_running())
self.finished.assert_called_with(self.dl)
# file has been downloaded
with open(self.tempfile) as f:
self.assertEquals(f.read(), '1234' * 4)
def test_download_cancel(self):
self.create_response('1234' * 1000, wait=0.01)
start = time.time()
self.dl.start()
time.sleep(0.1)
self.dl.cancel()
with self.assertRaises(download_manager.DownloadInterrupt):
self.dl.wait()
self.assertTrue(self.dl.is_canceled())
# response generation should have taken 1000 * 0.01 = 10 seconds.
# since we canceled, this must be lower.
self.assertTrue((time.time() - start) < 1.0)
# file was deleted
self.assertFalse(os.path.exists(self.tempfile))
# finished callback was called
self.finished.assert_called_with(self.dl)
def test_download_with_progress(self):
data = []
def update_progress(_dl, current, total):
data.append((_dl, current, total))
self.create_response('1234' * 4)
self.dl.set_progress(update_progress)
self.dl.start()
self.dl.wait()
self.assertEquals(data, [
(self.dl, 0, 16),
(self.dl, 4, 16),
(self.dl, 8, 16),
(self.dl, 12, 16),
(self.dl, 16, 16),
])
# file has been downloaded
with open(self.tempfile) as f:
self.assertEquals(f.read(), '1234' * 4)
# finished callback was called
self.finished.assert_called_with(self.dl)
def test_download_error_in_thread(self):
self.session_response.headers = {'Content-length': '24'}
self.session_response.iter_content.side_effect = IOError
self.dl.start()
with self.assertRaises(IOError):
self.dl.wait()
self.assertEquals(self.dl.error()[0], IOError)
# finished callback was called
self.finished.assert_called_with(self.dl)
def test_wait_does_not_block_on_exception(self):
# this test the case when a user may hit CTRL-C for example
# during a dl.wait() call.
self.create_response('1234' * 1000, wait=0.01)
original_join = self.dl.thread.join
it = iter('123')
def join(timeout=None):
next(it) # will throw StopIteration after a few calls
original_join(timeout)
self.dl.thread.join = join
start = time.time()
self.dl.start()
with self.assertRaises(StopIteration):
self.dl.wait()
self.assertTrue(self.dl.is_canceled())
# wait for the thread to finish
original_join()
# response generation should have taken 1000 * 0.01 = 10 seconds.
# since we got an error, this must be lower.
self.assertTrue((time.time() - start) < 1.0)
# file was deleted
self.assertFalse(os.path.exists(self.tempfile))
# finished callback was called
self.finished.assert_called_with(self.dl)
class TestDownloadManager(unittest.TestCase):
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
self.dl_manager = download_manager.DownloadManager(self.tempdir)
def do_download(self, url, fname, data, wait=0):
session, response = mock_session()
mock_response(response, data, wait)
# patch the session, so the download will use that
self.dl_manager.session = session
return self.dl_manager.download(url, fname)
def test_download(self):
dl1 = self.do_download('http://foo', 'foo', 'hello' * 4, wait=0.02)
self.assertIsInstance(dl1, download_manager.Download)
self.assertTrue(dl1.is_running())
# with the same fname, no new download is started. The same instance
# is returned since the download is running.
dl2 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02)
self.assertEquals(dl1, dl2)
# starting a download with another fname will trigger a new download
dl3 = self.do_download('http://bar', 'foo2', 'hello you' * 4)
self.assertIsInstance(dl3, download_manager.Download)
self.assertNotEquals(dl3, dl1)
# let's wait for the downloads to finish
dl3.wait()
dl1.wait()
# now if we try to download a fname that exists, None is returned
dl4 = self.do_download('http://bar', 'foo', 'hello2' * 4, wait=0.02)
self.assertIsNone(dl4)
# downloaded files are what is expected
def content(fname):
with open(os.path.join(self.tempdir, fname)) as f:
return f.read()
self.assertEquals(content('foo'), 'hello' * 4)
self.assertEquals(content('foo2'), 'hello you' * 4)
# download instances are removed from the manager (internal test)
self.assertEquals(self.dl_manager._downloads, {})
def test_cancel(self):
dl1 = self.do_download('http://foo', 'foo', 'foo' * 50000, wait=0.02)
dl2 = self.do_download('http://foo', 'bar', 'bar' * 50000, wait=0.02)
dl3 = self.do_download('http://foo', 'foobar', 'foobar' * 4)
# let's cancel only one
def cancel_if(dl):
if os.path.basename(dl.get_dest()) == 'foo':
return True
self.dl_manager.cancel(cancel_if=cancel_if)
self.assertTrue(dl1.is_canceled())
self.assertFalse(dl2.is_canceled())
self.assertFalse(dl3.is_canceled())
# wait for dl3
dl3.wait()
# cancel everything
self.dl_manager.cancel()
self.assertTrue(dl1.is_canceled())
self.assertTrue(dl2.is_canceled())
# dl3 is not canceled since it finished before
self.assertFalse(dl3.is_canceled())
# wait for the completion of dl1 and dl2 threads
dl1.wait(raise_if_error=False)
dl2.wait(raise_if_error=False)
# at the end, only dl3 has been downloaded
self.assertEquals(os.listdir(self.tempdir), ["foobar"])
with open(os.path.join(self.tempdir, 'foobar')) as f:
self.assertEquals(f.read(), 'foobar' * 4)
# download instances are removed from the manager (internal test)
self.assertEquals(self.dl_manager._downloads, {})