''' Created on Oct 19, 2010 @author: Peter ''' from numpy import * import operator import codecs def loadDataSet(): postingList=[['my', 'dog', 'has', 'flea', 'problems', 'help', 'please'], ['maybe', 'not', 'take', 'him', 'to', 'dog', 'park', 'stupid'], ['my', 'dalmation', 'is', 'so', 'cute', 'I', 'love', 'him'], ['stop', 'posting', 'stupid', 'worthless', 'garbage'], ['mr', 'licks', 'ate', 'my', 'steak', 'how', 'to', 'stop', 'him'], ['quit', 'buying', 'worthless', 'dog', 'food', 'stupid']] classVec = [0,1,0,1,0,1] #1 is abusive, 0 not return postingList,classVec #統計列表中全部不重複的單詞,詞彙袋 def createVocabList(dataSet): #定義詞聚集 vocabSet = set([]) #遍歷數據集 for document in dataSet: #把每一個文檔合併到詞彙袋中 vocabSet = vocabSet | set(document) #union of the two sets return list(vocabSet) #把單詞轉換成向量,用詞袋模型,計算詞出現的次數 #inputSet是輸入語句 #vocalList是詞袋(訓練數據集中出現過的單詞) #returnVec是對比語句會讓詞袋後記錄出現的單詞的列表 def setOfWords2Vec(vocabList, inputSet): #建立一個空的單詞列 returnVec = [0]*len(vocabList) #遍歷數據集單詞 for word in inputSet: #存在單詞在詞袋中則 if word in vocabList: #index用於找到第一個與之匹配的下標 returnVec[vocabList.index(word)] = 1 else: print ("the word: %s is not in my Vocabulary!" % word) return returnVec #樸素貝葉斯分類訓練函數 #trainMatrix是訓練矩陣 #trainMatrix由來************ #訓練數據集,訓練標籤集=bayes.loadDataSet() #詞彙袋=bayes.createVocabList(訓練數據集) #初始化訓練矩陣 #for 文檔 in 訓練數據集:(數據集裏面有不少文檔) # 訓練矩陣.append(bayes.setOfWords2Vec(詞袋,文檔)) #出來的訓練矩陣就是每一個文檔中出如今詞袋中的單詞 #trainMatrix相似這樣([0,1,1,0],[1,0,1,0]...) def trainNB0(trainMatrix,trainCategory): #計算文檔的數目 numTrainDocs = len(trainMatrix) #計算詞袋單詞的數目 numWords = len(trainMatrix[0]) #計算類別的機率 abusive的文檔站文檔總數的百分比 pAbusive = sum(trainCategory)/float(numTrainDocs) #初始化計數器,1行*numWords列 #p0是not abusive p1是abusive #爲何是否是0矩陣? #由於若是是0的話,按照貝葉斯公式,p(W0|1)*p(W1|1) #若是任意機率爲0則全爲0因此初始值爲1 #同時下面分母初始化2 p0Num = ones(numWords); p1Num = ones(numWords) #初始化分母 p0Denom = 2.0; p1Denom = 2.0 #遍歷數據集中每一個文檔 for i in range(numTrainDocs): #若是這文檔是abusive類的計算它的abusive比例 if trainCategory[i] == 1: #儲存每一個詞在abusive下出現次數 p1Num += trainMatrix[i] #儲存abusive詞的總數目 p1Denom += sum(trainMatrix[i]) else: #儲存每一個詞在not abusive的出現次數 p0Num += trainMatrix[i] #儲存not abusive詞的總數目 p0Denom += sum(trainMatrix[i]) #使用log是爲了防止數據向下溢出 # 計算abusive下每一個詞出現的機率 p1Vect = log(p1Num/p1Denom) #change to log() # 計算not abusive下每一個詞出現的機率 p0Vect = log(p0Num/p0Denom) #change to log() #返回詞出現的機率和文檔爲abusice的機率 return p0Vect,p1Vect,pAbusive #貝葉斯分類 #第一個參數是要被訓練的向量 #後面三個參數是根據上面一個函數處理訓練樣本後獲得的參數 def classifyNB(vec2Classify, p0Vec, p1Vec, pClass1): #計算abusive的機率 p1 = sum(vec2Classify * p1Vec) + log(pClass1) #計算not abusive機率 p0 = sum(vec2Classify * p0Vec) + log(1.0 - pClass1) #看哪一個機率大 if p1 > p0: return 1 else: return 0 #計算文檔單詞在某個詞袋中出現的次數 # 把單詞轉換成向量,用對比已經有得詞袋,計算詞出現的次數 def bagOfWords2VecMN(vocabList, inputSet): returnVec = [0]*len(vocabList) for word in inputSet: if word in vocabList: returnVec[vocabList.index(word)] += 1 return returnVec #測試貝葉斯 def testingNB(): #加載數據集 listOPosts,listClasses = loadDataSet() #建立詞彙袋 myVocabList = createVocabList(listOPosts) trainMat=[] for postinDoc in listOPosts: trainMat.append(setOfWords2Vec(myVocabList, postinDoc)) p0V,p1V,pAb = trainNB0(array(trainMat),array(listClasses)) testEntry = ['love', 'my', 'dalmation'] thisDoc = array(setOfWords2Vec(myVocabList, testEntry)) print (testEntry,'classified as: ',classifyNB(thisDoc,p0V,p1V,pAb)) testEntry = ['stupid', 'garbage'] thisDoc = array(setOfWords2Vec(myVocabList, testEntry)) print (testEntry,'classified as: ',classifyNB(thisDoc,p0V,p1V,pAb)) #輸入字符串,輸出單詞列表 def textParse(bigString): import re listOfTokens = re.split(r'\W*', bigString) #返回小寫單詞列表 return [tok.lower() for tok in listOfTokens if len(tok) > 2] #垃圾郵寄過濾測試函數 def spamTest(): # 定義docList文檔列表,classList類別列表,fullText全部文檔詞彙 docList=[]; classList = []; fullText =[] #遍歷文件夾內文件 #咱們也能夠用這個來實現 # doclist=listdir('trainingDigits') # num=len(doclist) # for i in range(1,num) for i in range(1,26): # 定義並讀取垃圾郵件文件的詞彙分割列表 wordList = textParse(open('email/spam/%d.txt' % i).read()) # 將詞彙列表加到文檔列表中 docList.append(wordList) # 將全部詞彙列表彙總到fullText中 fullText.extend(wordList) # 文檔類別爲1,spam classList.append(1) # 讀取非垃圾郵件的文檔 wordList = textParse(open('email/ham/%d.txt' % i).read()) # 添加到文檔列表中 docList.append(wordList) # 添加到全部詞彙列表中 fullText.extend(wordList) # 類別爲0,非垃圾郵件 classList.append(0) # 建立詞彙列表 vocabList = createVocabList(docList) # 定義訓練集的索引和測試集 trainingSet = list(range(50)); testSet=[] # 隨機的選擇10個做爲測試集 for i in range(10): #隨機索引 randIndex = int(random.uniform(0,len(trainingSet))) #將隨機選擇的文檔加入測試集 testSet.append(trainingSet[randIndex]) #從訓練集中刪除隨機選擇的文檔 del(trainingSet[randIndex]) #定義訓練集的矩陣和類別 trainMat=[]; trainClasses = [] #遍歷訓練集,求先驗機率和條件機率 for docIndex in trainingSet: #將詞彙列表變成向量放到trainList中 trainMat.append(bagOfWords2VecMN(vocabList, docList[docIndex])) #添加訓練集的類標籤 trainClasses.append(classList[docIndex]) #計算先驗機率,條件機率 p0V,p1V,pSpam = trainNB0(array(trainMat),array(trainClasses)) errorCount = 0 #對測試集分類 for docIndex in testSet: #classify the remaining items #將測試集向量化 wordVector = bagOfWords2VecMN(vocabList, docList[docIndex]) #對測試數據進行分類 if classifyNB(array(wordVector),p0V,p1V,pSpam) != classList[docIndex]: #錯誤的話錯誤計數加一 errorCount += 1 print( "classification error",docList[docIndex]) print ('the error rate is: ',float(errorCount)/len(testSet)) #return vocabList,fullText