我正在嘗試使用在Jupyter Notebook 中撰寫的 Python 代碼在具有PostGIS擴展的PostgreSQL資料庫中流式傳輸推文,但沒有成功。我使用了很多教程作為參考,在第一次嘗試中,代碼似乎可以作業并且沒有錯誤。我什至列印了我已連接到Twitter API的訊息。但是,沒有推文上傳到PostgreSQL資料庫。我認為問題可能出在過濾器上(因為也許我正在使用目前沒有推文的過濾器),但是經過一些運行洗掉過濾器或使用其他過濾器后,我發現這不是問題。我認為與PostgreSQL的連接也不是問題,因為我嘗試將推文直接列印到Jupyter Notebook 中,并且沒有錯誤也沒有錯誤。
在根據指南進行一些更改并檢查PostgreSQL表的格式后,我看到代碼連接到 Twitter API,但我一直收到此訊息:'str' object is not callable
在PostgreSQL的表是使用下面的代碼,與鳴叫的坐標存盤與點幾何的創建目標:
CREATE TABLE tweets (tweet_id VARCHAR PRIMARY KEY, user_id VARCHAR, username TEXT, tweet TEXT, hashtags TEXT, lang TEXT, created_at TIMESTAMP, coordinates GEOMETRY);
使用的 Python 代碼是下一個:
#!/usr/bin/env python
# coding: utf-8
#Import libraries
import tweepy
import pandas as pd
import json
import psycopg2
import time
from html.parser import HTMLParser
#Insert Twitter keys
ckey = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
csecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
atoken = "xxxxxxxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
asecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
#Authorize the Twitter API
auth = tweepy.OAuthHandler(ckey, csecret)
auth.set_access_token(atoken, asecret)
#Call the Twitter API
api = tweepy.API(auth)
#Define Listener block
class MyStreamListener(tweepy.StreamListener):
def __init__(self, time_limit=300):
self.start_time = time.time()
self.limit = time_limit
super(MyStreamListener, self).__init__()
def on_connect(self):
print("Connected to Twitter API.")
def on_status(self, status):
print(status.text)
def on_data(self, raw_data):
try:
datos = json.loads(raw_data)
#Filter only tweets with coordinates
if datos["coordinates"] != None:
#Obtain all the variables to store in each column
tweet_id = datos['id_str']
user_id = datos['user']['id']
user_name = datos['user']['name']
tweet = datos['text']
hashtags = datos["entities"]["hashtags"]
lang = datos['user']['lang']
created_at = datos['created_at']
coordinates = datos["coordinates"]["coordinates"]
# Connect to database
dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates)
if (time.time() - self.start_time) > self.limit:
print(time.time(), self.start_time, self.limit)
return False
except Exception as e:
print(e)
def on_error(self, status_code):
if status_code == 420:
# Returning False in on_data disconnects the stream
return False
def dbConnect(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates):
#Connect to Twitter database created in PostgreSQL
conn = psycopg2.connect(host="localhost",database="datos_twitter",port=5433,user="xxxxxxx",password="xxxxxxx")
#Create a cursor to perform database operations
cur = conn.cursor()
#With the cursor, insert tweets into a PostgreSQL table
command = "INSERT INTO tweets (tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
cur.execute(command(tweet_id, user_id, user_name, tweet, hashtags, lang, created_at, coordinates))
#Commit changes
conn.commit()
#Close cursor and the connection
cur.close()
conn.close()
#Streaming of tweets
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener, tweet_mode="extended")
#Filtering of tweets by spatial box and keywords
myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Madrid', 'madrid'])
uj5u.com熱心網友回復:
在on_data自變數是原始資料(串JSON從Twitter的API端點接收)
def on_data(self, raw_data):
"""This is called when raw data is received from the stream.
This method handles sending the data to other methods based on the
message type.
Parameters
----------
raw_data : JSON
The raw data from the stream
"""
on_status像這樣使用,在這個函式中,引數是Status物件,你可以訪問它的欄位,比如status.text
def on_status(self, status):
"""This is called when a status is received.
Parameters
----------
status : Status
The Status received
"""
這些函式的名稱在不同的版本中可能改變tweepy,請閱讀這篇文章,以安裝Python包的指定版本
uj5u.com熱心網友回復:
我已經編輯了代碼,修復了評論中指出的錯誤。我def on_status(self, status)基于以前對我有用的示例(在那種情況下,我錯誤地用資料替換了自己)。
連接已創建,我收到了該訊息,但 10 秒后我收到以下錯誤 TypeError: 'str' object is not callable
完整的錯誤跟蹤是:
TypeError Traceback (most recent call last)
<ipython-input-14-249caad94bfb> in <module>
4 tweet_mode="extended")
5 #Filtering by spatial box and keywords
----> 6 myStream.filter(locations=[-10.78,34.15, 5.95,44.04], track=['Covid','covid-19'])
~\Anaconda3\lib\site-packages\tweepy\streaming.py in filter(self, follow, track, is_async, locations, stall_warnings, languages, encoding, filter_level)
472 self.body['filter_level'] = filter_level.encode(encoding)
473 self.session.params = {'delimited': 'length'}
--> 474 self._start(is_async)
475
476 def sitestream(self, follow, stall_warnings=False,
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _start(self, is_async)
387 self._thread.start()
388 else:
--> 389 self._run()
390
391 def on_closed(self, resp):
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
318 # call a handler first so that the exception can be logged.
319 self.listener.on_exception(exc_info[1])
--> 320 six.reraise(*exc_info)
321
322 def _data(self, data):
~\Anaconda3\lib\site-packages\six.py in reraise(tp, value, tb)
691 if value.__traceback__ is not tb:
692 raise value.with_traceback(tb)
--> 693 raise value
694 finally:
695 value = None
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _run(self)
287 self.snooze_time = self.snooze_time_step
288 self.listener.on_connect()
--> 289 self._read_loop(resp)
290 except (Timeout, ssl.SSLError) as exc:
291 # This is still necessary, as a SSLError can actually be
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _read_loop(self, resp)
349 next_status_obj = buf.read_len(length)
350 if self.running and next_status_obj:
--> 351 self._data(next_status_obj)
352
353 # # Note: keep-alive newlines might be inserted before each length value.
~\Anaconda3\lib\site-packages\tweepy\streaming.py in _data(self, data)
321
322 def _data(self, data):
--> 323 if self.listener.on_data(data) is False:
324 self.running = False
325
~\Anaconda3\lib\site-packages\tweepy\streaming.py in on_data(self, raw_data)
52 if 'in_reply_to_status_id' in data:
53 status = Status.parse(self.api, data)
---> 54 if self.on_status(status) is False:
55 return False
56 elif 'delete' in data:
<ipython-input-12-3460245af936> in on_status(self, status)
34 if not hasattr(status, "retweeted_status") and coordinates!= None:
35 # Connect to database
---> 36 dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
37
38 if (time.time() - self.start_time) > self.limit:
<ipython-input-13-d7acfb1cce67> in dbConnect(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates)
6
7 command = "INSERT INTO tweets (tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates) VALUES (%s,%s,%s,%s,%s,%s,%s,%s)"
----> 8 cur.execute(command(tweet_id, user_id, username, tweet, hashtags, lang, created_at, coordinates))
9
10 conn.commit()
TypeError: 'str' object is not callable
我不確定,但現在似乎錯誤來自我將推文插入PostgreSQL 的行。
我現在已經編輯了添加函式的代碼def on_data(self, raw_data),然后def on_status(self, status)像注釋中提到的那樣放置。我繼續收到錯誤TypeError: 'str' object is not callable
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/343260.html
標籤:Python PostgreSQL 推特 jupyter-笔记本 twitterapi-python
上一篇:帶有外鍵的Django中的Querysetfilter()
下一篇:修復SQL觸發器語法
