PYTHON调用ZABBIX的API接口完成部份日常的运维工作
登陆
·
环境:PYTHON2.7
导入一些准备用到的模块
import json
import urllib2
from urllib2 import URLError
import sys
import xlrd
import getpass
import time
import datetime
import xlsxwriter
reload(sys)
sys.setdefaultencoding('utf-8')
登陆API
class ZabbixTools:
def __init__(self):
self.url = 'http://192.168.243.99//zabbix/api_jsonrpc.php'
self.header = {'User-agent' : 'Mozilla/5.0 (Windows NT 6.2; WOW64; rv:22.0) Gecko/20100101 Firefox/22.0',"Content-Type":"application/json"}
def user_login(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "user.login",
"params": {
"user": "Admin",
"password": "zabbix"
},
"id": 0
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print ("Auth Failed, please Check your name and password:", e.code)
else:
response = json.loads(result.read())
result.close()
self.authID = response['result']
return self.authID
根据主机名称获取被监控主机的信息,主机ID
def host_get(self,hostName):
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","status","available"],
"filter":{"host":hostName}
},
"auth":self.user_login(),
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
#print "Number Of %s: " % hostName, len(response['result'])
lens=len(response['result'])
if lens > 0:
print response['result'][0]['available'] +" "+response['result'][0]['name']
return response['result'][0]['hostid']
else:
return "error"
根据主机IP获取被监控主机的信息
def host_getbyip(self,hostip):
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","status","available"],
"filter": {"ip": hostip}
},
"auth":self.user_login(),
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
#print "Number Of %s: " % hostName, len(response['result'])
lens=len(response['result'])
if lens > 0:
print response['result'][0]['available'] +" "+response['result'][0]['name'] + " "+response['result'][0]['hostid']
return response['result'][0]['hostid']
else:
return "error"
根据主机组的名称获取主机组ID
def hostgroup_get(self, hostgroupName):
data = json.dumps({
"jsonrpc":"2.0",
"method":"hostgroup.get",
"params":{
"output": "extend",
"filter": {
"name": [
hostgroupName,
]
}
},
"auth":self.user_login(),
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
lens=len(response['result'])
if lens > 0:
#self.hostgroupID = response['result'][0]['groupid']
return response['result'][0]['groupid']
else:
print "no GroupGet result"
return ""
获取主机组对应的全部主机
def gethostbygroupname(self,hostgroupname):
groupid=self.hostgroup_get(hostgroupname)
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","description"],
"groupids":groupid,
},
"auth":self.authID,
"id":1,
})
request = urllib2.Request(self.url,data)
for key in self.header:
request.add_header(key,self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
print "Number Of Hosts: ", len(response['result'])
for host in response['result']:
hostip=self.gethostipbyhostid(host['hostid'])
print "Host ID:",host['hostid'],"HostName:",host['name'],hostip
获取指定代理服务器下面全部的主机
def proxy_get(self, ProxyName):
data = json.dumps({
"jsonrpc":"2.0",
"method": "proxy.get",
"params": {
"output": "extend",
"selectInterface": "extend",
"filter": {
"host": [ ProxyName, ]
}
},
"auth":self.authID,
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
self.templateID = response['result'][0]['proxyid']
return response['result'][0]['proxyid']
def gethostbyproxyname(self,proxyname):
proxyids=self.proxy_get(proxyname)
print proxyids
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.get",
"params":{
"output":["hostid","name","description"],
"filter": {
"proxy_hostid":proxyids,
}
},
"auth":self.authID,
"id":1,
})
request = urllib2.Request(self.url,data)
for key in self.header:
request.add_header(key,self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
print "Number Of Hosts: ", len(response['result'])
for host in response['result']:
hostip=self.gethostipbyhostid(host['hostid'])
print "Host ID:",host['hostid'],"HostName:",host['name'],"ip:",hostip
获取全部的主机组
def printgroups(self):
data = json.dumps({
"jsonrpc":"2.0",
"method":"hostgroup.get",
"params":{
"output":["groupid","name"],
},
"auth":self.user_login(), # theauth id is what auth script returns, remeber it is string
"id":1,
})
request = urllib2.Request(self.url,data)
for key in self.header:
request.add_header(key,self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
print "Number Of Hosts: ", len(response['result'])
#print response
for group in response['result']:
print "GroupID:",group['groupid'],"\tGroupName:",group['name']
获取指定模板的全部主机
def template_get(self, templateName):
data = json.dumps({
"jsonrpc":"2.0",
"method": "template.get",
"params": {
"output": "extend",
"filter": {
"host": [
templateName,
]
}
},
"auth":self.authID,
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
self.templateID = response['result'][0]['templateid']
return response['result'][0]['templateid']
def get_more_tempinfo(self,templatenames):
templateids=self.template_get(templatenames)
data = json.dumps({
"jsonrpc": "2.0",
"method": "template.get",
"params": {
"output": ['host', 'templateid'],
"templateids": [templateids],
"selectGroups": [ # 1.返回模板所属的主机组
"name",
"groupid"
],
"selectHosts": [ # 2.返回链接到模板的主机
"name",
"hostid"
],
"selectItems": [ # 3.返回模板中的监控项
"name",
"key_",
"itemid",
"interfaceid",
"delay",
"type",
"description",
"snmp_oid"
],
"selectTriggers": [ # 4.返回模板中的触发器
"description",
"triggerid",
],
"selectTemplates": [ # 5.返回此模板被哪些模板链接了
"host",
"templateid"
],
"selectParentTemplates": [ # 6.返回此模板链接了哪些模板
"host",
"templateid"
],
},
"auth": self.authID,
"id": 1
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print
"Error as ", e
else:
response = json.loads(result.read())
result.close()
for item11 in response['result'][0]['items']:
print item11["name"]+"@"+item11["key_"]+"@"+item11["delay"]+"@"+item11["description"]+"@"+item11["type"]
print item11["snmp_oid"]
for hosts11 in response['result'][0]['hosts']:
print hosts11["name"]
修改主机的显示名
def host_updatevisibleName(self, hostNames,visibleName):
hostid=self.host_get(hostNames)
data = json.dumps({
"jsonrpc":"2.0",
"method":"host.update",
"params":{
"hostid": hostid,
"name": visibleName
},
"auth": self.user_login(),
"id":1
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
result.close()
print "update" + hostNames + " okay"
获取代理的ID并更新主机的代理
def host_updateproxyid(self,hostNames,proxyids):
hostid=self.host_get(hostNames)
data = json.dumps({
"jsonrpc":"2.0",
"method":"proxy.update",
"params":{
"proxyid": proxyids,
"hosts": [
hostid
]
},
"auth": self.user_login(),
"id":1
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
result.close()
print "update" + hostNames + " okay"
def proxy_get(self, ProxyName):
data = json.dumps({
"jsonrpc":"2.0",
"method": "proxy.get",
"params": {
"output": "extend",
"selectInterface": "extend",
"filter": {
"host": [ ProxyName, ]
}
},
#"auth":self.user_login(),
"auth":self.authID,
"id":1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
self.templateID = response['result'][0]['proxyid']
return response['result'][0]['proxyid']
创建和修改主机的宏变量
def create_host_macros(self,hostnames,macros,values):
host_id=self.host_get(hostnames)
data = json.dumps({
"jsonrpc":"2.0",
"method":"usermacro.create",
"params":{
"hostid": host_id,
# "macro": "{$SNMP_COMMUNITYTEST}",
# "value": "publics"
"macro": macros,
"value": values
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
print "macro is created!"
def gethostmacroid(self, hostid, macronames):
data = json.dumps({
"jsonrpc": "2.0",
"method": "usermacro.get",
"params": {
"output": "extend",
"filter": {"macro": macronames}
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
for infos in response['result']:
if hostid==infos["hostid"]:
return infos["hostmacroid"]
def updatemacro(self, hostnames, macronames, macrovalues):
#hostid = self.host_get(hostnames)
hostid = self.host_getbyip(hostnames)
hostmacroid = self.gethostmacroid(hostid, macronames)
print hostmacroid
data = json.dumps({
"jsonrpc": "2.0",
"method": "usermacro.update",
"params": {
#"hostid": hostid,
"hostmacroid": hostmacroid,
#"macro": macronames,
"value": macrovalues
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
print hostnames,macronames,macrovalues
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
print "update okay " + str(response)
启用监控项和触发器 status 0 为启用
def getitems(self,hostnames,itemnames):
hostid = self.host_get(hostnames)
data = json.dumps({
"jsonrpc": "2.0",
"method": "item.get",
"params": {
"output": "extend",
"hostids": hostid,
"filter": {
"name": itemnames,
}
# "proxy_hostid":proxyids,
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
#print response['result']
return response['result'][0]["itemid"]
def enableitems(self,hostnames,itemnames,status):
#hostid = self.host_get(hostnames)
itemids=self.getitems(hostnames,itemnames)
data = json.dumps({
"jsonrpc": "2.0",
"method": "item.update",
"params": {
"itemid": itemids,
"status": status
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
print "update okay"
def gettriggers(self,hostnames,triggernames):
hostid = self.host_get(hostnames)
data = json.dumps({
"jsonrpc": "2.0",
"method": "trigger.get",
"params": {
"output": "extend",
"hostids": hostid,
"filter": {
"description": triggernames,
}
# "proxy_hostid":proxyids,
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
#print response['result']
return response['result'][0]["triggerid"]
def enabletriggers(self,hostnames,triggersnames,status):
#hostid = self.host_get(hostnames)
triggerids=self.gettriggers(hostnames,triggersnames)
data = json.dumps({
"jsonrpc": "2.0",
"method": "trigger.update",
"params": {
"triggerid": triggerids,
"status": status
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
print "update okay"
获取主机组信息,用户组信息,告警动作对应的主机组ID和用户组ID
def gethostgroups(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "hostgroup.get",
"params": {
"output": "extend",
"status": 0
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
print response['result']
#print str(response['result'][0]['name'])
for items in response['result']:
print items["name"],items["groupid"]
def getgroupsusers(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "usergroup.get",
"params": {
"output": "extend",
#"userids": "extend",
"selectUsers":"extend",
"selectTagFilters":"extend",
#"status": 0
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
for items in response['result']:
if len(items["users"])>0:
for rlists in items["users"]:
print rlists
print items["name"],rlists["alias"],rlists["userid"]
def getactions(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "action.get",
"params": {
"output": "extend",
"selectOperations": "extend",
"selectRecoveryOperations": "extend",
"selectFilter": "extend"
},
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
if hasattr(e, 'reason'):
print 'We failed to reach a server.'
print 'Reason: ', e.reason
elif hasattr(e, 'code'):
print 'The server could not fulfill the request.'
print 'Error code: ', e.code
else:
response = json.loads(result.read())
result.close()
for actins in response['result']:
if int(actins["status"])==0 and "opmessage_grp" in actins["operations"][0].keys() and len(actins["operations"][0]["opmessage_grp"])>0:
for gps in actins["filter"]['conditions']:
if int(gps['conditiontype'])==0:
print actins['name'],gps["value"],actins["operations"][0]["opmessage_grp"][0]['usrgrpid']
将每一个ZABBIX的PROXY对应的主机输出至EXCEL表格
def printproxy(self):
data = json.dumps({
"jsonrpc": "2.0",
"method": "proxy.get",
"params": {
"output": "extend",
"selectInterface": "extend",
},
# "auth":self.user_login(),
"auth": self.authID,
"id": 1,
})
request = urllib2.Request(self.url, data)
for key in self.header:
request.add_header(key, self.header[key])
try:
result = urllib2.urlopen(request)
except URLError as e:
print "Error as ", e
else:
response = json.loads(result.read())
result.close()
nowtime = time.strftime('%Y-%m-%d-%H-%M-%S', time.localtime(time.time())) + ""
filenames = nowtime + 'printproxy.xlsx'
workbook1 = xlsxwriter.Workbook(filenames)
worksheet = workbook1.add_worksheet()
t1 = 'ZABBIX PROXY 客户端主机的分类'
format = workbook1.add_format()
format.set_bold()
yellow = workbook1.add_format(
{'align': 'center', 'valign': 'vcenter', 'font_size': 22, 'fg_color': 'FFC1C1'})
yellow.set_bold()
# worksheet.merge_range(0,0,0,4,t1,yellow)
worksheet.merge_range('A1:B1', t1, yellow)
worksheet.set_row(0, 38)
worksheet.set_column("A:A", 30)
worksheet.set_column("B:B", 60)
title = [u'PROXY主机', u'AGENTIP',u'AGENTNAME',u'AGENT其它信息']
format = workbook1.add_format()
# worksheet.set_column(0,15,20)
format = workbook1.add_format({'align': 'center', 'valign': 'vcenter'})
format.set_bold()
worksheet.write_row('A2', title, format)
worksheet.set_row(1, 25)
row = 2
for pxys in response['result']:
resluts = self.gethostbyproxidreturn(pxys['proxyid'])
pxyinfos=pxys['host']+" "+pxys['proxy_address']+ " 节点数量 "+str(len(resluts))
print pxys['host'], "\tProxyipaddress:", pxys['proxy_address'], "节点数量:" + str(len(resluts))
if len(resluts) > 0:
worksheet.write(row,0,pxyinfos)
rows = row
for rlist in resluts:
hostsips = self.gethostipbyhostid(rlist['hostid'])
print hostsips, rlist['name'], rlist['description']
worksheet.write(rows, 1, hostsips)
worksheet.write(rows, 2, rlist['name'])
worksheet.write(rows, 2, rlist['description'])
rows = rows + 1
worksheet.merge_range(row, 0, row + len(resluts) - 1, 0,pxyinfos)
print row+len(resluts)
row = row + len(resluts)
print "-----------------------------------------------"
print "-----------------------------------------------"
workbook1.close()
简单的调用方式
if __name__ == "__main__":
test = ZabbixTools()
test.user_login()
test.printproxy()
更多推荐




所有评论(0)