Link-Optimizer / app.py
dejanseo's picture
Update app.py
1e86db5 verified
raw
history blame
11.8 kB
import streamlit as st
import torch
# Use AutoModel and AutoTokenizer for easier loading from Hub
from transformers import AutoModelForTokenClassification, AutoTokenizer
import numpy as np
import logging
from dataclasses import dataclass
from typing import Optional, Dict, List, Tuple
# --- HIDE STREAMLIT MENU ---
st.set_page_config(
initial_sidebar_state="collapsed"
)
hide_streamlit_style = """
<style>
#MainMenu {visibility: hidden;}
</style>
"""
st.markdown(hide_streamlit_style, unsafe_allow_html=True)
st.logo(
image="https://dejan.ai/wp-content/uploads/2024/02/dejan-300x103.png",
link="https://dejan.ai/",
)
# ----------------------------------
# Logging
# ----------------------------------
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ----------------------------------
# Config
# ----------------------------------
@dataclass
class AppConfig:
"""Configuration for the LinkBERT application"""
# <<< CHANGE 1: Point to the Hugging Face Hub repository >>>
model_name: str = "dejanseo/link-prediction"
max_length: int = 512
doc_stride: int = 128
device: str = "cuda" if torch.cuda.is_available() else "cpu"
# ----------------------------------
# Load model/tokenizer from Hugging Face Hub
# ----------------------------------
@st.cache_resource
def load_model_from_hub():
"""Loads the fine-tuned model and tokenizer from the Hugging Face Hub."""
config = AppConfig()
logger.info(f"Loading model and tokenizer from Hugging Face Hub: {config.model_name}...")
# <<< CHANGE 2: Use Auto* classes for direct loading from the Hub >>>
model = AutoModelForTokenClassification.from_pretrained(config.model_name)
tokenizer = AutoTokenizer.from_pretrained(config.model_name)
model.to(config.device)
model.eval()
logger.info("Model and tokenizer loaded successfully.")
return model, tokenizer, config.device, config.max_length, config.doc_stride
# <<< CHANGE 3: Call the new loading function >>>
model, tokenizer, device, MAX_LENGTH, DOC_STRIDE = load_model_from_hub()
# ----------------------------------
# Inference helpers
# ----------------------------------
def windowize_inference(
plain_text: str, tokenizer: AutoTokenizer, max_length: int, doc_stride: int
) -> List[Dict]:
"""Slice long text into overlapping windows for inference."""
specials = tokenizer.num_special_tokens_to_add(pair=False)
cap = max_length - specials
full_encoding = tokenizer(
plain_text, add_special_tokens=False, return_offsets_mapping=True, truncation=False
)
temp_tokenization = tokenizer(plain_text, truncation=False)
full_word_ids = temp_tokenization.word_ids(batch_index=0)
windows_data = []
step = max(cap - doc_stride, 1)
start_token_idx = 0
total_tokens = len(full_encoding["input_ids"])
if total_tokens == 0 and len(plain_text) > 0:
logger.warning("Tokenizer produced 0 tokens for a non-empty string.")
return []
while start_token_idx < total_tokens:
end_token_idx = min(start_token_idx + cap, total_tokens)
ids_slice = full_encoding["input_ids"][start_token_idx:end_token_idx]
offsets_slice = full_encoding["offset_mapping"][start_token_idx:end_token_idx]
# Properly slice word_ids based on character spans
word_ids_slice = []
current_token = 0
for i, wid in enumerate(full_word_ids):
if temp_tokenization.token_to_chars(i) is not None:
if current_token >= start_token_idx and current_token < end_token_idx:
word_ids_slice.append(wid)
current_token += 1
input_ids = tokenizer.build_inputs_with_special_tokens(ids_slice)
attention_mask = [1] * len(input_ids)
padding_length = max_length - len(input_ids)
input_ids.extend([tokenizer.pad_token_id] * padding_length)
attention_mask.extend([0] * padding_length)
# Pad offset mapping correctly
window_offset_mapping = tokenizer.build_inputs_with_special_tokens([]) # Get special tokens offsets
window_offset_mapping = window_offset_mapping[:-1] + offsets_slice + window_offset_mapping[-1:]
window_offset_mapping += [(0, 0)] * padding_length
window_word_ids = [None] + word_ids_slice + [None] * (padding_length + 1)
windows_data.append({
"input_ids": torch.tensor(input_ids, dtype=torch.long),
"attention_mask": torch.tensor(attention_mask, dtype=torch.long),
"word_ids": window_word_ids[:max_length],
"offset_mapping": window_offset_mapping[:max_length],
})
if end_token_idx >= total_tokens: break
start_token_idx += step
return windows_data
def classify_text(
text: str, prediction_threshold_percent: float
) -> Tuple[str, Optional[str]]:
"""Classify link tokens with windowing. Returns (html, warning)."""
if not text.strip(): return "", "Input text is empty."
windows = windowize_inference(text, tokenizer, MAX_LENGTH, DOC_STRIDE)
if not windows: return "", "Could not generate any windows for processing."
char_link_probabilities = np.zeros(len(text), dtype=np.float32)
with torch.no_grad():
for window in windows:
inputs = {
'input_ids': window['input_ids'].unsqueeze(0).to(device),
'attention_mask': window['attention_mask'].unsqueeze(0).to(device)
}
# <<< CHANGE 4: The output object from AutoModel has a 'logits' attribute directly >>>
outputs = model(**inputs)
probabilities = torch.softmax(outputs.logits, dim=-1).squeeze(0)
link_probs = probabilities[:, 1].cpu().numpy()
for i, (start, end) in enumerate(window['offset_mapping']):
if window['word_ids'][i] is not None and start < end:
char_link_probabilities[start:end] = np.maximum(
char_link_probabilities[start:end], link_probs[i]
)
final_threshold = prediction_threshold_percent / 100.0
full_encoding = tokenizer(text, return_offsets_mapping=True, truncation=False)
word_ids = full_encoding.word_ids(batch_index=0)
offsets = full_encoding['offset_mapping']
word_max_prob_map: Dict[int, float] = {}
word_char_spans: Dict[int, List[int]] = {}
for i, word_id in enumerate(word_ids):
if word_id is not None:
start_char, end_char = offsets[i]
if start_char < end_char:
current_token_max_prob = np.max(char_link_probabilities[start_char:end_char]) if np.any(char_link_probabilities[start_char:end_char]) else 0.0
if word_id not in word_max_prob_map:
word_max_prob_map[word_id] = current_token_max_prob
word_char_spans[word_id] = [start_char, end_char]
else:
word_max_prob_map[word_id] = max(word_max_prob_map[word_id], current_token_max_prob)
word_char_spans[word_id][1] = end_char
highlight_candidates: Dict[int, float] = {}
for word_id, max_prob in word_max_prob_map.items():
if max_prob >= final_threshold:
highlight_candidates[word_id] = max_prob
max_highlight_prob = 0.0
if highlight_candidates:
max_highlight_prob = max(highlight_candidates.values())
html_parts, current_char = [], 0
sorted_word_ids = sorted(word_char_spans.keys(), key=lambda k: word_char_spans[k][0])
for word_id in sorted_word_ids:
start_char, end_char = word_char_spans[word_id]
if start_char > current_char:
html_parts.append(text[current_char:start_char])
word_text = text[start_char:end_char]
if word_id in highlight_candidates:
word_prob = highlight_candidates[word_id]
normalized_opacity = 1.0
if max_highlight_prob > 0:
normalized_opacity = (word_prob / max_highlight_prob) * 0.9 + 0.1
base_bg_color = "#D4EDDA"
base_text_color = "#155724"
html_parts.append(f"<span style='background-color: {base_bg_color}; color: {base_text_color}; "
f"padding: 0.1em 0.2em; border-radius: 0.2em; opacity: {normalized_opacity:.2f};'>"
f"{word_text}</span>")
else:
html_parts.append(word_text)
current_char = end_char
if current_char < len(text):
html_parts.append(text[current_char:])
return "".join(html_parts), None
# ----------------------------------
# Streamlit UI (No changes needed from here down)
# ----------------------------------
st.set_page_config(layout="wide", page_title="LinkBERT by DEJAN AI")
st.title("LinkBERT")
DEFAULT_THRESHOLD = 70.0
THRESHOLD_STEP = 10.0
THRESHOLD_BOUNDARY_PERCENT = 10.0
if 'current_threshold' not in st.session_state:
st.session_state.current_threshold = DEFAULT_THRESHOLD
if 'output_html' not in st.session_state:
st.session_state.output_html = ""
if 'user_input' not in st.session_state:
st.session_state.user_input = "DEJAN AI is the world's leading AI SEO agency. This tool showcases the capability of our latest link prediction model called LinkBERT. This model is trained on the highest quality organic link data and can predict natural link placement in plain text."
user_input = st.text_area(
"Paste your text here:",
st.session_state.user_input,
height=200,
key="text_area"
)
with st.expander('Settings'):
slider_threshold = st.slider(
"Link Probability Threshold (%)",
min_value=0, max_value=100, value=int(st.session_state.current_threshold), step=1,
help="The minimum probability for a word to be considered a link candidate."
)
def run_classification(new_threshold: float):
st.session_state.current_threshold = float(new_threshold)
st.session_state.user_input = user_input
if not st.session_state.user_input.strip():
st.warning("Please enter some text to classify.")
st.session_state.output_html = ""
else:
with st.spinner("Processing..."):
html, warning = classify_text(st.session_state.user_input, st.session_state.current_threshold)
if warning: st.warning(warning)
st.session_state.output_html = html
st.rerun()
if st.button("Classify Text", type="primary"):
run_classification(slider_threshold)
if st.session_state.output_html:
st.markdown("---")
st.subheader(f"Results (Threshold: {st.session_state.current_threshold:.1f}%)")
st.markdown(st.session_state.output_html, unsafe_allow_html=True)
col1, col2, col3 = st.columns(3)
with col1:
if st.button("Less", icon="➖", use_container_width=True, disabled=not st.session_state.output_html):
current_thr = st.session_state.current_threshold
if current_thr >= (100.0 - THRESHOLD_BOUNDARY_PERCENT):
new_threshold = current_thr + (100.0 - current_thr) / 2.0
else:
new_threshold = current_thr + THRESHOLD_STEP
run_classification(min(100.0, new_threshold))
with col2:
if st.button("Default", icon="🔄", use_container_width=True, disabled=not st.session_state.output_html):
run_classification(DEFAULT_THRESHOLD)
with col3:
if st.button("More", icon="➕", use_container_width=True, disabled=not st.session_state.output_html):
current_thr = st.session_state.current_threshold
if current_thr <= THRESHOLD_BOUNDARY_PERCENT:
new_threshold = current_thr / 2.0
else:
new_threshold = current_thr - THRESHOLD_STEP
run_classification(max(0.0, new_threshold))