// Since some of our data doesn"t have a body, we can combine all of the text
// for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
// Replace null values with an empty string
.fillna("")
.select(
// Combine columns
concat(
// First column to concatenate. col() is used to specify that we"re referencing a column
col("title"),
// Literal character that will be between the concatenated columns.
lit(" "),
// Second column to concatenate.
col("body")
// Change the name of the new column
).alias("text")
)
)
// Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
// The Document Assembler takes the raw text data and convert it into a format that can
// be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
// The Tokenizer takes data that is of the "Document" type and tokenizes it.
// While slightly more involved than this, this is effectively taking a string and splitting
// it along ths spaces, so each word is its own string. The data then becomes the
// spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
// The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
// The Stemmer takes objects of class "Token" and converts the words into their
// root meaning. For instance, the words "cars", "cars"" and "car"s" would all be replaced
// with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
// The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
// components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
// Stopwords are common words that generally don"t add much detail to the meaning
// of a body of text. In English, these are mostly "articles" such as the words "the"
// and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
// Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
// of the vocabulary that"s being created so we can map our topics back to their
// corresponding words.
// TF (term frequency) creates a matrix that counts how many times each word in the
// vocabulary appears in each body of text. This then gives each word a weight based
// on it"s frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
// Here we implement the IDF portion. IDF (Inverse document frequency) reduces
// the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
// LDA creates a statistical representation of how frequently words appear
// together in order to create "topics" or groups of commonly appearing words.
lda = LDA(k=10, maxIter=10)
// We add all of the transformers into a Pipeline object. Each transformer
// will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
// We fit the data to the model.
model = pipeline.fit(df_train)
// Now that we have completed a pipeline, we want to output the topics as human-readable.
// To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
// model and do the appropriate mapping. The output from each individual component lives
// in the model object. We can access them by referring to them by their position in
// the pipeline via model.stages[<ind>]
// Let"s create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
// Next, let"s grab the topics generated by our LDA model via describeTopics(). Using collect(),
// we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics().collect()
// Lastly, let"s get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
// The indices we just grab directly map to the term at position <ind> from our vocabulary.
// Using the below code, we can generate the mappings from our topic indicies to our vocabulary.
After Change
// for the titles and bodies so that every row has useful data.
df_train = (
reddit_data
// Replace null values with an empty string
.fillna("")
.select(
// Combine columns
concat(
// First column to concatenate. col() is used to specify that we"re referencing a column
col("title"),
// Literal character that will be between the concatenated columns.
lit(" "),
// Second column to concatenate.
col("body")
// Change the name of the new column
).alias("text")
)
// The text has several tags including [REMOVED] or [DELETED] for redacted content.
// We"ll replace these with empty strings.
.select(
regexp_replace(col("text"), "\[.*?\]", "")
.alias("text")
)
)
// Now, we begin assembling our pipeline. Each component here is used to some transformation to the data.
// The Document Assembler takes the raw text data and convert it into a format that can
// be tokenized. It becomes one of spark-nlp native object types, the "Document".
document_assembler = DocumentAssembler().setInputCol("text").setOutputCol("document")
// The Tokenizer takes data that is of the "Document" type and tokenizes it.
// While slightly more involved than this, this is effectively taking a string and splitting
// it along ths spaces, so each word is its own string. The data then becomes the
// spark-nlp native type "Token".
tokenizer = Tokenizer().setInputCols(["document"]).setOutputCol("token")
// The Normalizer will group words together based on similar semantic meaning.
normalizer = Normalizer().setInputCols(["token"]).setOutputCol("normalizer")
// The Stemmer takes objects of class "Token" and converts the words into their
// root meaning. For instance, the words "cars", "cars"" and "car"s" would all be replaced
// with the word "car".
stemmer = Stemmer().setInputCols(["normalizer"]).setOutputCol("stem")
// The Finisher signals to spark-nlp allows us to access the data outside of spark-nlp
// components. For instance, we can now feed the data into components from Spark MLlib.
finisher = Finisher().setInputCols(["stem"]).setOutputCols(["to_spark"]).setValueSplitSymbol(" ")
// Stopwords are common words that generally don"t add much detail to the meaning
// of a body of text. In English, these are mostly "articles" such as the words "the"
// and "of".
stopword_remover = StopWordsRemover(inputCol="to_spark", outputCol="filtered")
// Here we implement TF-IDF as an input to our LDA model. CountVectorizer (TF) keeps track
// of the vocabulary that"s being created so we can map our topics back to their
// corresponding words.
// TF (term frequency) creates a matrix that counts how many times each word in the
// vocabulary appears in each body of text. This then gives each word a weight based
// on it"s frequency.
tf = CountVectorizer(inputCol="filtered", outputCol="raw_features")
// Here we implement the IDF portion. IDF (Inverse document frequency) reduces
// the weights of commonly-appearing words.
idf = IDF(inputCol="raw_features", outputCol="features")
// LDA creates a statistical representation of how frequently words appear
// together in order to create "topics" or groups of commonly appearing words.
// In this case, we"ll create 5 topics.
lda = LDA(k=5)
// We add all of the transformers into a Pipeline object. Each transformer
// will execute in the ordered provided to the "stages" parameter
pipeline = Pipeline(
stages = [
document_assembler,
tokenizer,
normalizer,
stemmer,
finisher,
stopword_remover,
tf,
idf,
lda
]
)
// We fit the data to the model.
model = pipeline.fit(df_train)
// Now that we have completed a pipeline, we want to output the topics as human-readable.
// To do this, we need to grab the vocabulary generated from our pipeline, grab the topic
// model and do the appropriate mapping. The output from each individual component lives
// in the model object. We can access them by referring to them by their position in
// the pipeline via model.stages[<ind>]
// Let"s create a reference our vocabulary.
vocab = model.stages[-3].vocabulary
// Next, let"s grab the topics generated by our LDA model via describeTopics(). Using collect(),
// we load the output into a Python array.
raw_topics = model.stages[-1].describeTopics(maxTermsPerTopic=5).collect()
// Lastly, let"s get the indices of the vocabulary terms from our topics
topic_inds = [ind.termIndices for ind in raw_topics]
// The indices we just grab directly map to the term at position <ind> from our vocabulary.
// Using the below code, we can generate the mappings from our topic indicies to our vocabulary.