Skip to content
This repository was archived by the owner on May 21, 2023. It is now read-only.

Don't select a logfile, add an icon, be awesome. #21

Open
wants to merge 36 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
15d38eb
Remove UI for selecting logfile.
jerub Dec 8, 2012
6a9fecf
Add an icon that looks like a bell.
jerub Dec 8, 2012
6ddc915
Output to a logfile if there's an error in the program.
jerub Dec 8, 2012
fcba05d
Remove unneeded kwarg bundle_files=1 to distutils.setup()
jerub Dec 8, 2012
3dac7e9
Create README.md
jerub Dec 8, 2012
e6d14d3
Add support for fff as well as xxx.
jerub Dec 8, 2012
5885c52
Match names with numbers and apostraphes.
jerub Dec 9, 2012
288d24e
Dedup multiple messages sent in the same minute.
jerub Dec 9, 2012
fece451
Support hyphens in pilot names. Silly hyphens.
jerub Dec 9, 2012
ced8dff
Render text using HTML instead of StaticText
jerub Dec 11, 2012
7535d41
Update README.md
jerub Dec 13, 2012
2d5b2da
Add status bar that shows current action and latest log update.
jerub Dec 13, 2012
304b587
Make the release .exe file 8mb instead of 20
jerub Dec 13, 2012
a25a27e
Include the required 'ascii' codec.
jerub Dec 13, 2012
a8870c0
Don't exclude codecs.
jerub Dec 17, 2012
1ed2197
Entirely move KosLookupExe to using .format()
jerub Dec 17, 2012
4a07f6f
Add a menu with 'reset', 'about' and 'exit'
jerub Dec 17, 2012
e8b2c81
Only use sys.argv[0] for the icon if an exe.
jerub Dec 17, 2012
e67e812
Seek to end of FileTailer more efficiently.
jerub Dec 18, 2012
8ec6c7a
Implement live updates for koslookup
jerub Dec 18, 2012
27aad0d
Add more things to gitignore.
jerub Dec 21, 2012
7734987
Allow live updates from http://www.nrds.eu/
jerub Dec 22, 2012
ee08aee
Merge
jerub Dec 22, 2012
ab1d9f7
Simple fixes for live-updates.
jerub Dec 22, 2012
399ebad
Set release to 0.8b1.
jerub Dec 24, 2012
a1566d5
Fix timezone issues.
jerub Dec 24, 2012
1301215
Fix a bug with spaces in the filename.
jerub Dec 28, 2012
7c2ee2d
Update version to 0.8 release
jerub Dec 28, 2012
bbcb43c
Update version to 0.8.1
jerub Jan 26, 2013
4d57854
Fix py2exe build of 0.8.1
jerub Jan 26, 2013
b2b7368
Update ChatKosLookup.py
cainau Apr 13, 2013
f0a9657
Update ChatKosLookup_test.py
cainau Apr 13, 2013
3908a87
Count all CCP employees as being extremely KOS. Ding 4 times.
jerub May 27, 2013
f20ed9c
Merge
jerub May 27, 2013
4fde5b6
Merge
jerub May 27, 2013
c296092
Improve the regular expression for matching users with 3 names.
jerub May 27, 2013
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
*.pyc
build/
dist/
*.pyc
.*.sw?
*~
build/
dist/
202 changes: 148 additions & 54 deletions ChatKosLookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,128 @@

"""Checks pilots mentioned in the EVE chatlogs against a KOS list."""

import dbhash
import codecs
import collections
import datetime
import operator
import re
from evelink import api, eve
from evelink.cache.sqlite import SqliteCache
import sys, string, os, tempfile, time, json, urllib2, urllib
import sys, os, tempfile, time, json, urllib2, urllib

KOS_CHECKER_URL = 'http://kos.cva-eve.org/api/?c=json&type=unit&%s'
NPC = 'npc'
LASTCORP = 'lastcorp'

Entry = collections.namedtuple('Entry', 'pilots comment linekey')

class FileTailer:
def __init__(self, filename):
self.handle = open(filename, 'rb')
self.where = 0
MATCH = re.compile(
# each line starts with a byte order marker
u'\ufeff?'
# [ date time ]
ur'\[ (?P<date>\d+\.\d+\.\d+) (?P<time>\d+:\d+:\d+) \] '
# Pilot Name >
ur'(?P<pilot>[a-z0-9\'\-]+(?: [a-z0-9\'\-]+){0,2}) > '
# xxx or fff (any case), then the names of pilots
ur'(?:xxx|fff) (?P<names>[a-z0-9\'\- \r\n]+)'
# A hash then a comment
ur'(?:#(?P<comment>.*))?',
re.IGNORECASE)

def __init__(self, filename, encoding='utf-16'):
self.filename = filename
self.handle = codecs.open(filename, 'rb', encoding)

# seek to the end
fstat = os.fstat(self.handle.fileno())
self.handle.seek(fstat.st_size)
self.mtime = fstat.st_mtime

def close(self):
self.handle.close()

def poll(self):
self.where = self.handle.tell()
line = self.handle.readline()
if not line:
self.handle.seek(self.where)
return (None, None)
fstat = os.fstat(self.handle.fileno())
size = fstat.st_size
self.mtime = fstat.st_mtime
where = self.handle.tell()
while size > where:
try:
line = self.handle.readline()
except UnicodeError:
self.close()
raise
where = self.handle.tell()

answer = self.check(line)
if answer:
return answer

return None

def last_update(self):
return self.mtime

def check(self, line):
m = self.MATCH.match(line)
if not m:
return None

logdate = m.group('date')
logtime = m.group('time')
timestamp = datetime.datetime.strptime(
'{} {}'.format(logdate, logtime),
'%Y.%m.%d %H:%M:%S')
pilot = m.group('pilot')
names = m.group('names').replace(' ', '\n')
names = tuple(n.strip() for n in names.splitlines())
if m.group('comment'):
suffix = m.group('comment').strip()
comment = '[%s] %s > %s' % (logtime, pilot, suffix)
else:
sanitized = ''.join([x for x in line if x in string.printable and
x not in ['\n', '\r']])
if not '> xxx ' in sanitized:
return (None, None)
left, command = sanitized.split('> xxx ', 1)
timestamp = left.split(']', 1)[0].split(' ')[2]
person = left.split(']', 1)[1].strip()
mashup = command.split('#', 1)
names = mashup[0]
comment = '[%s] %s >' % (timestamp, person)
if len(mashup) > 1:
comment = '[%s] %s > %s' % (timestamp, person, mashup[1].strip())
return (names.split(' '), comment)
suffix = None
comment = '[%s] %s >' % (logtime, pilot)

linekey = (timestamp.hour, timestamp.minute, pilot, names, suffix)
return Entry(names, comment, linekey)


class DirectoryTailer:
def __init__(self, path):
self.path = path
self.watchers = {}
self.mtime = 0

for _answer in iter(self.poll, None):
pass

def last_update(self):
if self.watchers:
return max(w.last_update() for w in self.watchers.itervalues())
else:
return None

def poll(self):
st_mtime = os.stat(self.path).st_mtime
if st_mtime != self.mtime:
self.mtime = st_mtime
for name in os.listdir(self.path):
filename = os.path.join(self.path, name)
if filename in self.watchers:
continue
# anything within a day.
if abs(self.mtime - os.stat(filename).st_mtime) < 86400:
self.watchers[filename] = FileTailer(filename)

for filename, watcher in self.watchers.items():
try:
for answer in iter(watcher.poll, None):
return answer
except UnicodeError:
del self.watchers[filename]
return None


class KosChecker:
"""Maintains API state and performs KOS checks."""
Expand All @@ -51,30 +139,38 @@ def __init__(self):
def koscheck(self, player):
"""Checks a given player against the KOS list, including esoteric rules."""
kos = self.koscheck_internal(player)
if kos == None or kos == NPC:
# We were unable to find the player. Use employment history to
# get their current corp and look that up. If it's an NPC corp,
# we'll get bounced again.
history = self.employment_history(player)
cid = self.eve.character_id_from_name(player)
if kos not in (None, NPC):
return kos, cid

kos = self.koscheck_internal(history[0])
in_npc_corp = (kos == NPC)
# We were unable to find the player. Use employment history to
# get their current corp and look that up. If it's an NPC corp,
# we'll get bounced again.
history = self.employment_history(cid)

idx = 0
while kos == NPC and (idx + 1) < len(history):
idx = idx + 1
kos = self.koscheck_internal(history[idx])
in_npc_corp = False
for corp in history:
kos = self.koscheck_internal(corp)
if kos != NPC:
break
in_npc_corp = True

if in_npc_corp and kos != None and kos != NPC and kos != False:
kos = '%s: %s' % (LASTCORP, history[idx])
if kos == NPC:
kos = None

if kos == None or kos == NPC:
kos = False
if in_npc_corp and kos:
kos = '%s: %s' % (LASTCORP, kos)

return kos
return kos, cid

def koscheck_internal(self, entity):
"""Looks up KOS entries by directly calling the CVA KOS API."""
"""Looks up KOS entries by directly calling the CVA KOS API.

@returns: The reason this pilot is KOS.
"""
if entity.startswith('CCP '):
return 'CCP'

cache_key = self.api._cache_key(KOS_CHECKER_URL, {'entity': entity})

result = self.cache.get(cache_key)
Expand All @@ -83,33 +179,29 @@ def koscheck_internal(self, entity):
KOS_CHECKER_URL % urllib.urlencode({'q' : entity})))
self.cache.put(cache_key, result, 60*60)

kos = None
for value in result['results']:
# Require exact match (case-insensitively).
if value['label'].lower() != entity.lower():
continue
if value['type'] == 'alliance' and value['ticker'] == None:
# Bogus alliance created instead of NPC corp.
continue
kos = False
while True:
while value:
if value['kos']:
kos = '%s: %s' % (value['type'], value['label'])
if 'npc' in value and value['npc'] and not kos:
return '%s: %s' % (value['type'], value['label'])
if 'npc' in value and value['npc']:
# Signal that further lookup is needed of player's last corp
return NPC

if 'corp' in value:
value = value['corp']
elif 'alliance' in value:
value = value['alliance']
else:
return kos
break
return kos
return

def employment_history(self, character):
def employment_history(self, cid):
"""Retrieves a player's most recent corporations via EVE api."""
cid = self.eve.character_id_from_name(character)
cdata = self.eve.character_info_from_id(cid)
corps = cdata['history']
unique_corps = []
Expand Down Expand Up @@ -143,13 +235,15 @@ def koscheck_logentry(self, entry):
continue
person = person.strip(' .')
try:
result = self.koscheck(person)
if result != False:
kos.append((person, result))
reason, cid = self.koscheck(person)
if reason:
kos.append((person, reason, cid))
else:
notkos.append(person)
notkos.append((person, cid))
except:
error.append(person)
raise
kos.sort(key=operator.itemgetter(1, 0))
return (kos, notkos, error)


Expand Down
71 changes: 71 additions & 0 deletions ChatKosLookup_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import tempfile
import unittest
import os
import sys

sys.path.append('evelink-api')

import ChatKosLookup
from ChatKosLookup import Entry


class TestFileTailer(unittest.TestCase):
def setUp(self):
self.tmpfile = tempfile.mktemp()
open(self.tmpfile, 'w').close()
self.ft = ChatKosLookup.FileTailer(self.tmpfile)

def test_check_nothing(self):
line = "[ 2012.07.29 00:23:56 ] Foo Bar > nothing much"
answer = self.ft.check(line)
self.assertEquals(answer, None)

def test_check_kos_xxx(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] Foo Bar > xxx Bad Pilot")
self.assertEquals(answer,
Entry(('Bad Pilot',), '[00:23:56] Foo Bar >',
(0, 23, 'Foo Bar', ('Bad Pilot',), None)))

def test_check_kos_comment(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] Foo Bar > xxx Bad Pilot # 9uy")
self.assertEquals(answer,
Entry(('Bad Pilot',), '[00:23:56] Foo Bar > 9uy',
(0, 23, 'Foo Bar', ('Bad Pilot',), '9uy')))

def test_check_kos_fff(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] Foo Bar > fff Bad Pilot")
self.assertEquals(answer,
Entry(('Bad Pilot',), '[00:23:56] Foo Bar >',
(0, 23, 'Foo Bar', ('Bad Pilot',), None)))

def test_check_kos_gq(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] Foo Bar > xxx GQSmooth00")
self.assertEquals(answer,
Entry(('GQSmooth00',), '[00:23:56] Foo Bar >',
(0, 23, 'Foo Bar', ('GQSmooth00',), None)))

def test_check_kos_obrian(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] O'Goode > xxx Bad O'brian")
self.assertEquals(answer,
Entry(('Bad O\'brian',), '[00:23:56] O\'Goode >',
(0, 23, 'O\'Goode', ('Bad O\'brian',), None)))

def test_check_kos_ii(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] A -A > xxx I -I")
self.assertEquals(answer,
Entry(('I -I',), '[00:23:56] A -A >',
(0, 23, 'A -A', ('I -I',), None)))

def test_check_kos_three_names(self):
answer = self.ft.check("[ 2012.07.29 00:23:56 ] Admiral L Jenkins > xxx Admiral L Jenkins")
self.assertEquals(answer,
Entry(('Admiral L Jenkins',), '[00:23:56] Admiral L Jenkins >',
(0, 23, 'Admiral L Jenkins', ('Admiral L Jenkins',), None)))

def tearDown(self):
self.ft.close()
os.unlink(self.tmpfile)


if __name__ == '__main__':
unittest.main()
Loading