# coding: utf-8 # In[1]: import os import sys os.environ["PYSPARK_SUBMIT_ARGS"]=' --driver-memory 5g --packages com.databricks:spark-csv_2.10:1.1.0 pyspark-shell' sys.path.insert(0, os.environ.get('SPARK_HOME', None) + "/python") import py4j from pyspark import SparkContext,SparkConf,SQLContext conf = (SparkConf().setMaster("spark://bd-m:7077") .setAppName("lab09") .set("spark.executor.memory", "50g") .set("spark.driver.maxResultSize","5g") .set("spark.driver.memory","2g") .set("spark.cores.max", "26")) sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) # In[2]: ratings_src=sc.textFile('/lab10/train.csv',26) ratings=ratings_src.map(lambda r: r.split(",")).filter(lambda x: x[0]!='userId').map(lambda x: (int(x[0]),int(x[1]),float(x[2]))) ratings.take(5) # In[3]: test_src=sc.textFile('/lab10/test.csv',26) test=test_src.map(lambda r: r.split(",")).filter(lambda x: x[0]!='userId').map(lambda x: (int(x[0]),int(x[1]))) test.take(5) # In[4]: from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel from pyspark.mllib.recommendation import Rating rat = ratings.map(lambda r: Rating(int(r[0]),int(r[1]),float(r[2]))) rat.cache() rat.first() # In[14]: training,validation,testing = rat.randomSplit([0.6,0.2,0.2]) # In[15]: print training.count() print validation.count() print testing.count() # In[16]: training.cache() validation.cache() # In[17]: import math def evaluate_model(model, dataset): testdata = dataset.map(lambda x: (x[0],x[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = dataset.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).reduce(lambda x, y: x + y) / ratesAndPreds.count() RMSE = math.sqrt(MSE) return {'MSE':MSE, 'RMSE':RMSE} # In[12]: rank=20 numIterations=30 # In[28]: model = ALS.train(training, rank, numIterations) # In[ ]: numIterations=30 lambda_=0.085 ps = [] for rank in range(25,500,25): model = ALS.train(training, rank, numIterations,lambda_) metrics = evaluate_model(model, validation) print("Rank = " + str(rank) + " MSE = " + str(metrics['MSE']) + " RMSE = " + str(metrics['RMSE'])) ps.append((rank,metrics['RMSE'])) # In[10]: ls = [] rank=2 numIterations = 30 for lambda_ in [0.0001, 0.001, 0.01, 0.1, 1.0, 10.0, 100.0, 1000.0]: model = ALS.train(training, rank, numIterations, lambda_) metrics = evaluate_model(model, validation) print("Lambda = " + str(lambda_) + " MSE = " + str(metrics['MSE']) + " RMSE = " + str(metrics['RMSE'])) ls.append((lambda_,metrics['RMSE'])) # In[23]: ls = [] rank=250 numIterations = 30 for lambda_ in [0.085]: model = ALS.train(training, rank, numIterations, lambda_) metrics = evaluate_model(model, validation) print("Lambda = " + str(lambda_) + " MSE = " + str(metrics['MSE']) + " RMSE = " + str(metrics['RMSE'])) ls.append((lambda_,metrics['RMSE'])) #Lambda = 0.1 MSE = 0.751080178965 RMSE = 0.866648821014 #Lambda = 0.075 MSE = 0.750219897276 RMSE = 0.866152352232 #Lambda = 0.07 MSE = 0.750033337876 RMSE = 0.866044651202 #Lambda = 0.08 MSE = 0.749335888762 RMSE = 0.865641894066 #Lambda = 0.09 MSE = 0.749929174577 RMSE = 0.865984511742 #rank 200 Lambda = 0.085 MSE = 0.709501168484 RMSE = 0.842318923261 get_ipython().run_cell_magic(u'time', u'', u'rank=400\nnumIterations=30\nlambda_=0.085\nmodel = ALS.train(rat, rank, numIterations,lambda_)\npredictions = model.predictAll(test).map(lambda r: (r[0], r[1], r[2]))') # In[7]: te=test.collect() base=sorted(te,key=lambda x: x[0]*1000000+x[1]) # In[8]: pred=predictions.collect() # In[9]: t_=predictions.map(lambda x: (x[0], {x[1]:x[2]})).reduceByKey(lambda a,b: dict(a.items()+b.items())).collect() t={} for i in t_: t[i[0]]=i[1] s="userId,movieId,rating\r\n" for i in base: if t.has_key(i[0]): u=t[i[0]] if u.has_key(i[1]): s+=str(i[0])+","+str(i[1])+","+str(u[i[1]])+"\r\n" else: s+=str(i[0])+","+str(i[1])+",3.67671059005\r\n" else: s+=str(i[0])+","+str(i[1])+",3.67671059005\r\n" # In[12]: text_file = open("lab10.csv", "w") text_file.write(s) text_file.close()
Source: https://habr.com/ru/post/268073/
All Articles