|
| 1 | +from collections import namedtuple |
1 | 2 | import csv
|
2 |
| -import datetime |
3 | 3 | import unittest
|
| 4 | +from unittest.mock import patch |
4 | 5 |
|
5 |
| -from usertweets import UserTweets |
6 |
| -from usertweets import NUM_TWEETS |
7 | 6 |
|
8 |
| -DT = datetime.datetime(2017, 1, 13, 9, 0, 5) |
| 7 | +from tweets import TWEETS # mock data |
| 8 | +from usertweets import UserTweets, NUM_TWEETS |
| 9 | + |
9 | 10 | HANDLE = 'pybites'
|
10 | 11 | MAX_ID = '819831370113351680'
|
11 |
| -TWEETS = ( |
12 |
| - """5 cool things you can do with itertools https://t.co/Nk4s3yL6zL #python""", |
13 |
| - """How to create a nice-looking HTML page of your #Kindle book highlights (notes) https://t.co/HKFK7inhUa #python""", |
14 |
| -) |
15 |
| -USER = UserTweets(HANDLE, max_id=MAX_ID) |
16 |
| - |
17 |
| -def read_csv(): |
18 |
| - with open(USER.output_file) as f: |
| 12 | + |
| 13 | +Tweet = namedtuple('Tweet', ['id_str', 'created_at', 'text']) |
| 14 | + |
| 15 | + |
| 16 | +def read_csv(fname): |
| 17 | + with open(fname) as f: |
| 18 | + has_header = csv.Sniffer().has_header(f.readline()) |
| 19 | + f.seek(0) |
19 | 20 | r = csv.reader(f)
|
20 |
| - next(r, None) # skip the headers |
21 |
| - return list(r) |
| 21 | + if has_header: |
| 22 | + next(r, None) # skip the header |
| 23 | + return [Tweet(*tw) for tw in r] # list(r) |
| 24 | + |
22 | 25 |
|
23 | 26 | class TestUserTweets(unittest.TestCase):
|
| 27 | + def setUp(self): |
| 28 | + super().setUp() |
| 29 | + with patch('tweepy.API.user_timeline') as mock_timeline: |
| 30 | + mock_timeline.return_value = TWEETS |
| 31 | + self.user = UserTweets(HANDLE, max_id=MAX_ID) |
| 32 | + |
| 33 | + def tearDown(self): |
| 34 | + self.user = None |
| 35 | + super().tearDown() |
24 | 36 |
|
25 | 37 | def test_num_tweets(self):
|
26 |
| - self.assertEqual(len(USER), NUM_TWEETS) |
| 38 | + self.assertEqual(len(self.user), NUM_TWEETS) |
27 | 39 |
|
28 | 40 | def test_first_tweet_returned_by_api(self):
|
29 |
| - self.assertEqual(USER[0].id_str, MAX_ID) |
30 |
| - self.assertEqual(USER[0].created_at, DT) |
31 |
| - self.assertEqual(USER[0].text, TWEETS[0]) |
| 41 | + tw_n = 0 |
| 42 | + self.assertEqual(self.user[tw_n].id_str, MAX_ID) |
| 43 | + self.assertEqual(self.user[tw_n].created_at, TWEETS[tw_n].created_at) |
| 44 | + self.assertEqual(self.user[tw_n].text, TWEETS[tw_n].text) |
32 | 45 |
|
33 | 46 | def test_read_back_from_cached_csv(self):
|
34 |
| - csv_tweets = read_csv() |
| 47 | + csv_tweets = read_csv(self.user.output_file) |
35 | 48 | self.assertEqual(len(csv_tweets), NUM_TWEETS)
|
36 |
| - self.assertEqual(csv_tweets[0][0], MAX_ID) |
37 |
| - self.assertEqual(csv_tweets[0][1], str(DT)) |
38 |
| - self.assertEqual(csv_tweets[0][2], TWEETS[0]) |
39 |
| - self.assertEqual(csv_tweets[-1][2], TWEETS[1]) |
| 49 | + tw_n = 0 # first |
| 50 | + self.assertEqual(csv_tweets[tw_n].id_str, MAX_ID) |
| 51 | + self.assertEqual(csv_tweets[tw_n].created_at, |
| 52 | + str(TWEETS[tw_n].created_at)) |
| 53 | + self.assertEqual(csv_tweets[tw_n].text, TWEETS[tw_n].text) |
| 54 | + tw_n = -1 # last |
| 55 | + self.assertEqual(csv_tweets[tw_n].text, TWEETS[tw_n].text) |
| 56 | + |
40 | 57 |
|
41 | 58 | if __name__ == "__main__":
|
42 | 59 | unittest.main()
|
0 commit comments