캐싱을 사용하면 함수의 출력을 저장하여 동일한 입력에 대한 함수를 재실행을 방지합니다. 세션을 사용하면 , 페이지 새로고침이나 위젯 상호작용 시에도 각 사용자에 대한 정보를 유지할 수 있습니다.
캐싱
캐싱의 기본 개념은 함수 호출의 결과를 저장했다가 동일한 입력이 함수에 제공될때 함수를 실행하지 않고 캐시된 결과를 반환하는 것입니다. 이렇게 하면 동일한 입력값을 가진 함수가 반복적으로 실행되는 것을 방지하여 성능향상을 가져올 수 있습니다.
캐싱 효과를 비교 확인하기 위해서 우선은 캐싱없이 동작하는 예제를 실행해봅니다.
import streamlit as st import time import pandas as pd import numpy as np defexpensive_computation(): """시간이 오래 걸리는 연산을 시뮬레이션하는 함수""" time.sleep(3) # 3초 대기 (실제 연산 시뮬레이션) if st.button("함수 10번 실행"): execution_times = [] total_start_time = time.time() for i in range(10): start_time = time.time() result = expensive_computation() execution_time = time.time() - start_time execution_times.append(execution_time) st.write(f"{i+1}번째 실행 시간: {execution_time:.2f}초") total_execution_time = time.time() - total_start_time # 결과 요약 st.write("---") st.write(f"**총 실행 시간: {total_execution_time:.2f}초**") st.write(f"**평균 실행 시간: {sum(execution_times)/len(execution_times):.2f}초**")
실행결과입니다. 함수 호출할때 마다 3초씩 걸려 총 10번 함수를 실행하여 30초가 걸렸습니다.
이제 캐싱을 사용하여 동작하는 코드를 실행해봅니다.
import streamlit as st import time import pandas as pd import numpy as np @st.cache_data defexpensive_computation(): """시간이 오래 걸리는 연산을 시뮬레이션하는 함수""" time.sleep(3) # 3초 대기 (실제 연산 시뮬레이션) if st.button("함수 10번 실행"): execution_times = [] total_start_time = time.time() for i in range(10): start_time = time.time() result = expensive_computation() execution_time = time.time() - start_time execution_times.append(execution_time) st.write(f"{i+1}번째 실행 시간: {execution_time:.2f}초") total_execution_time = time.time() - total_start_time # 결과 요약 st.write("---") st.write(f"**총 실행 시간: {total_execution_time:.2f}초**") st.write(f"**평균 실행 시간: {sum(execution_times)/len(execution_times):.2f}초**")
실행 결과 첫번째 함수 실행만 3초가 걸리구 이후에는 0초 걸리는 것을 볼 수 있습니다. 처음에만 실제 함수가 실행되어 함수 결과가 캐싱되고 이후에는 저장된 결과를 사용하기 때문에 실행하는데 걸리는 시간이 0초가 된 것입니다.
캐싱시 두가지 캐싱 데코레이터를 사용할 수 있습니다. 일반적인 데이터를 캐싱하는 경우에는 앞에서 사용했던 @st.cache_data를 사용하면 되고 모델, 데이터베이스 연결같은 것을 캐싱하는 경우에는 @st.cache_resource를 사용하면 됩니다. 함수 선언 앞에 적어주기만 하면 됩니다.
코드를 수정하여 함수 리턴값을 화면에 출력하도록 해봅니다.
import streamlit as st import time import pandas as pd import numpy as np @st.cache_data defexpensive_computation(i): """시간이 오래 걸리는 연산을 시뮬레이션하는 함수""" time.sleep(3) # 3초 대기 (실제 연산 시뮬레이션) return i*3 if st.button("함수 10번 실행"): execution_times = [] total_start_time = time.time() for i in range(10): start_time = time.time() result = expensive_computation(i) execution_time = time.time() - start_time execution_times.append(execution_time) st.write(f"{i+1}번째 실행 시간: {execution_time:.2f}초, 리턴값: {result}") total_execution_time = time.time() - total_start_time # 결과 요약 st.write("---") st.write(f"**총 실행 시간: {total_execution_time:.2f}초**") st.write(f"**평균 실행 시간: {sum(execution_times)/len(execution_times):.2f}초**")
처음 실행시에는 3초 간격으로 실행결과가 출력되었습니다. 인덱스에 3을 곱한 값이 출력됩니다. 인덱스가 0부터 시작하므로 2번째 실행 시간옆에 3이 출력됩니다.
다시 실행해보면 캐싱을 사용하기 때문에 모든 실행하는데 걸리는 시간이 0초가 됩니다.
인덱스의 4배가 되도록 함수를 수정해봅니다.
import streamlit as st import time import pandas as pd import numpy as np @st.cache_data defexpensive_computation(i): """시간이 오래 걸리는 연산을 시뮬레이션하는 함수""" time.sleep(3) # 3초 대기 (실제 연산 시뮬레이션) return i*4 if st.button("함수 10번 실행"): execution_times = [] total_start_time = time.time() for i in range(10): start_time = time.time() result = expensive_computation(i) execution_time = time.time() - start_time execution_times.append(execution_time) st.write(f"{i+1}번째 실행 시간: {execution_time:.2f}초, 리턴값: {result}") total_execution_time = time.time() - total_start_time # 결과 요약 st.write("---") st.write(f"**총 실행 시간: {total_execution_time:.2f}초**") st.write(f"**평균 실행 시간: {sum(execution_times)/len(execution_times):.2f}초**")
실행해보면 함수의 리턴값이 바뀌었기 때문에 캐싱을 사용할 수 없어서 모든 함수 호출시 3초가 걸리게 됩니다.
세션 상태
세션 상태는 웹브라우저 탭마다 개별적으로 유지되는 메모리로 파이썬 코드가 다시 실행되더라도 값이 사라지지 않고 유지됩니다. 각 사용자/탭마다 독립적인 상태가 유지됩니다.
import streamlit as st # ============================================ # 1. 일반 변수 # ============================================ st.header("일반 변수") normal_count = 0# 매번 0으로 초기화 st.write(f"초기값: {normal_count}") if st.button("증가 버튼 (일반 변수)", key="btn1"): normal_count += 1# 0에서 1로 증가 st.write(f"버튼 밖에서 출력: {normal_count}") # 다시 0 # ============================================ # 2. 세션 상태 # ============================================ st.header("세션 상태") if"session_count"notin st.session_state: st.session_state.session_count = 0 if st.button("증가 버튼 (세션 상태)"): st.session_state.session_count += 1 # 버튼 처리 후에 값 출력 st.write(f"현재값: {st.session_state.session_count}") st.write(f"버튼 밖에서 출력: {st.session_state.session_count}")
실행해봅니다.
아래 스크린샷은 각 버튼을 두번 클릭시 결과입니다.
일반 변수에 있는 버튼을 두번 클릭하면 버튼 클릭할때 마다 전체 코드가 다시 실행되어 normal_count 변수의 값이 항상 0으로 초기화되기 때문에 버튼 클릭하여 증가한 값이 항상 1로 출력됩니다.
세션 상태에 있는 버튼을 두번 클릭하면 버튼을 클릭할때마다 전체 코드가 다시 실행되지만 세션 상태의 session_count 변수의 값이 보존되기 때문에 버튼 클릭시 마다 1씩 증가한 값이 출력됩니다.
이 상황에서 F5를 누르면 세션 상태의 변수 값도 유지가 되지 않기 때문에 0으로 초기화되는 것을 볼 수 있습니다. 세션 상태는 탭이 유지된 상태에서 UI 조작이 있는 상황에서만 세션 상태에 있는 값들이 유지됩니다.
세션 상태를 사용하는 방법은 두가지가 있습니다. 첫번째 방법 사용시 공백이나 특수 문자가 가능하지만 두번째 방법을 사용시엔 파이썬 변수처럼 적어야 해서 공백이나 특수 문자가 불가능합니다.
앞에서 잠깐 언급했던 @st.cache_resource를 사용하여 연결을 캐시할 수 도 있지만 데이터베이스 연결을 위해 st.connection를 사용하는 것이 좋습니다.
st.connection이 캐시보다 더 좋은 점은 st.connection은 별도의 캐싱 데코레이터 없이도 연결과 쿼리 결과를 자동으로 캐싱해주므로 코드가 더 간결해집니다. 또한 데이터베이스 비밀번호나 API 키를 secrets.toml 파일로 안전하게 분리 관리할 수 있어 보안성이 뛰어납니다. 마지막으로 데이터베이스 연결 풀링과 재사용을 자동으로 최적화하여 매번 새로운 연결을 생성하지 않아도 되므로 성능이 크게 향상됩니다.
테스트해볼 수 있는 예제 코드입니다. 편의상 필요한 파일을 모두 자동으로 생성하도록 했습니다. 테스트하려면 추가로 sqlalchemy 패키지를 설치해야합니다.
import streamlit as st import sqlite3 import pandas as pd import os st.title("데이터베이스 연결 테스트 (자동 설정)") defcreate_secrets_file(): """secrets.toml 파일을 자동으로 생성하는 함수""" # .streamlit 폴더가 없으면 생성 ifnot os.path.exists('.streamlit'): os.makedirs('.streamlit') st.info("📁 .streamlit 폴더를 생성했습니다.") # secrets.toml 파일 생성 secrets_content = """[connections.my_database] type = "sql" url = "sqlite:///test.db" """ with open('.streamlit/secrets.toml', 'w', encoding='utf-8') as f: f.write(secrets_content) st.success("✅ .streamlit/secrets.toml 파일을 생성했습니다!") returnTrue defcreate_gitignore(): """보안을 위한 .gitignore 파일 생성""" gitignore_content = """# Streamlit secrets .streamlit/secrets.toml # Database files *.db # Python __pycache__/ *.pyc *.pyo """ ifnot os.path.exists('.gitignore'): with open('.gitignore', 'w', encoding='utf-8') as f: f.write(gitignore_content) st.info("🔒 .gitignore 파일을 생성했습니다. (보안상 중요!)") defcreate_test_database(): """테스트용 SQLite 데이터베이스 생성""" ifnot os.path.exists('test.db'): with st.spinner("테스트 데이터베이스 생성 중..."): conn = sqlite3.connect('test.db') df_sample = pd.DataFrame({ 'id': [1, 2, 3, 4, 5], 'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'], 'age': [25, 30, 35, 28, 32], 'city': ['Seoul', 'Busan', 'Incheon', 'Daegu', 'Gwangju'] }) df_sample.to_sql('my_table', conn, if_exists='replace', index=False) conn.close() st.success("📊 테스트 데이터베이스를 생성했습니다!") returnTrue returnFalse # 자동 설정 실행 st.subheader("🔧 자동 설정") # 1. .gitignore 파일 생성 (보안상 먼저) create_gitignore() # 2. secrets.toml 파일 확인 및 생성 ifnot os.path.exists('.streamlit/secrets.toml'): st.warning("⚠️ secrets.toml 파일이 없습니다. 자동으로 생성합니다...") if create_secrets_file(): st.info("🔄 페이지를 새로고침하여 연결을 테스트하세요.") st.stop() else: st.success("✅ secrets.toml 파일이 이미 존재합니다.") # 3. 테스트 데이터베이스 생성 create_test_database() st.subheader("📁 생성된 파일들") col1, col2, col3 = st.columns(3) with col1: st.write("📄 .streamlit/secrets.toml", "✅"if os.path.exists('.streamlit/secrets.toml') else"❌") with col2: st.write("📄 .gitignore", "✅"if os.path.exists('.gitignore') else"❌") with col3: st.write("📄 test.db", "✅"if os.path.exists('test.db') else"❌") # 데이터베이스 연결 및 조회 st.subheader("🔗 데이터베이스 연결 테스트") try: conn = st.connection("my_database") df = conn.query("SELECT * FROM my_table") st.success("✅ 데이터베이스 연결 성공!") st.subheader("📊 데이터베이스에서 가져온 데이터:") st.dataframe(df) # 추가 쿼리 예제 st.subheader("🔍 필터링 예제 (나이 30 이상):") df_filtered = conn.query("SELECT * FROM my_table WHERE age >= 30") st.dataframe(df_filtered) # 통계 정보 st.subheader("📈 간단한 통계:") col1, col2, col3 = st.columns(3) with col1: st.metric("총 인원", len(df)) with col2: st.metric("평균 나이", f"{df['age'].mean():.1f}세") with col3: st.metric("최고 나이", f"{df['age'].max()}세") except Exception as e: st.error(f"❌ 데이터베이스 연결 오류: {e}") st.info("아래 설정을 확인하세요:") if os.path.exists('.streamlit/secrets.toml'): st.subheader("현재 secrets.toml 내용:") with open('.streamlit/secrets.toml', 'r', encoding='utf-8') as f: st.code(f.read(), language='toml')
Comments ()